1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifdef GFLAGS
7 #include <cinttypes>
8 #include <cstdio>
9 #include <limits>
10 #include <set>
11 #include <sstream>
12
13 #include "monitoring/histogram.h"
14 #include "port/port.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/secondary_cache.h"
19 #include "rocksdb/system_clock.h"
20 #include "rocksdb/utilities/object_registry.h"
21 #include "table/block_based/cachable_entry.h"
22 #include "util/coding.h"
23 #include "util/gflags_compat.h"
24 #include "util/hash.h"
25 #include "util/mutexlock.h"
26 #include "util/random.h"
27 #include "util/stop_watch.h"
28 #include "util/string_util.h"
29
30 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
31
32 static constexpr uint32_t KiB = uint32_t{1} << 10;
33 static constexpr uint32_t MiB = KiB << 10;
34 static constexpr uint64_t GiB = MiB << 10;
35
36 DEFINE_uint32(threads, 16, "Number of concurrent threads to run.");
37 DEFINE_uint64(cache_size, 1 * GiB,
38 "Number of bytes to use as a cache of uncompressed data.");
39 DEFINE_uint32(num_shard_bits, 6, "shard_bits.");
40
41 DEFINE_double(resident_ratio, 0.25,
42 "Ratio of keys fitting in cache to keyspace.");
43 DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread.");
44 DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added.");
45
46 DEFINE_uint32(skew, 5, "Degree of skew in key selection");
47 DEFINE_bool(populate_cache, true, "Populate cache before operations");
48
49 DEFINE_uint32(lookup_insert_percent, 87,
50 "Ratio of lookup (+ insert on not found) to total workload "
51 "(expressed as a percentage)");
52 DEFINE_uint32(insert_percent, 2,
53 "Ratio of insert to total workload (expressed as a percentage)");
54 DEFINE_uint32(lookup_percent, 10,
55 "Ratio of lookup to total workload (expressed as a percentage)");
56 DEFINE_uint32(erase_percent, 1,
57 "Ratio of erase to total workload (expressed as a percentage)");
58 DEFINE_bool(gather_stats, false,
59 "Whether to periodically simulate gathering block cache stats, "
60 "using one more thread.");
61 DEFINE_uint32(
62 gather_stats_sleep_ms, 1000,
63 "How many milliseconds to sleep between each gathering of stats.");
64
65 DEFINE_uint32(gather_stats_entries_per_lock, 256,
66 "For Cache::ApplyToAllEntries");
67 DEFINE_bool(skewed, false, "If true, skew the key access distribution");
68 #ifndef ROCKSDB_LITE
69 DEFINE_string(secondary_cache_uri, "",
70 "Full URI for creating a custom secondary cache object");
71 static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;
72 #endif // ROCKSDB_LITE
73
74 DEFINE_bool(use_clock_cache, false, "");
75
76 namespace ROCKSDB_NAMESPACE {
77
78 class CacheBench;
79 namespace {
80 // State shared by all concurrent executions of the same benchmark.
81 class SharedState {
82 public:
SharedState(CacheBench * cache_bench)83 explicit SharedState(CacheBench* cache_bench)
84 : cv_(&mu_),
85 num_initialized_(0),
86 start_(false),
87 num_done_(0),
88 cache_bench_(cache_bench) {}
89
~SharedState()90 ~SharedState() {}
91
GetMutex()92 port::Mutex* GetMutex() { return &mu_; }
93
GetCondVar()94 port::CondVar* GetCondVar() { return &cv_; }
95
GetCacheBench() const96 CacheBench* GetCacheBench() const { return cache_bench_; }
97
IncInitialized()98 void IncInitialized() { num_initialized_++; }
99
IncDone()100 void IncDone() { num_done_++; }
101
AllInitialized() const102 bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; }
103
AllDone() const104 bool AllDone() const { return num_done_ >= FLAGS_threads; }
105
SetStart()106 void SetStart() { start_ = true; }
107
Started() const108 bool Started() const { return start_; }
109
110 private:
111 port::Mutex mu_;
112 port::CondVar cv_;
113
114 uint64_t num_initialized_;
115 bool start_;
116 uint64_t num_done_;
117
118 CacheBench* cache_bench_;
119 };
120
121 // Per-thread state for concurrent executions of the same benchmark.
122 struct ThreadState {
123 uint32_t tid;
124 Random64 rnd;
125 SharedState* shared;
126 HistogramImpl latency_ns_hist;
127 uint64_t duration_us = 0;
128
ThreadStateROCKSDB_NAMESPACE::__anonfc6ee2bb0111::ThreadState129 ThreadState(uint32_t index, SharedState* _shared)
130 : tid(index), rnd(1000 + index), shared(_shared) {}
131 };
132
133 struct KeyGen {
134 char key_data[27];
135
GetRandROCKSDB_NAMESPACE::__anonfc6ee2bb0111::KeyGen136 Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) {
137 uint64_t key = 0;
138 if (!FLAGS_skewed) {
139 uint64_t raw = rnd.Next();
140 // Skew according to setting
141 for (uint32_t i = 0; i < FLAGS_skew; ++i) {
142 raw = std::min(raw, rnd.Next());
143 }
144 key = FastRange64(raw, max_key);
145 } else {
146 key = rnd.Skewed(max_log);
147 if (key > max_key) {
148 key -= max_key;
149 }
150 }
151 // Variable size and alignment
152 size_t off = key % 8;
153 key_data[0] = char{42};
154 EncodeFixed64(key_data + 1, key);
155 key_data[9] = char{11};
156 EncodeFixed64(key_data + 10, key);
157 key_data[18] = char{4};
158 EncodeFixed64(key_data + 19, key);
159 return Slice(&key_data[off], sizeof(key_data) - off);
160 }
161 };
162
createValue(Random64 & rnd)163 char* createValue(Random64& rnd) {
164 char* rv = new char[FLAGS_value_bytes];
165 // Fill with some filler data, and take some CPU time
166 for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) {
167 EncodeFixed64(rv + i, rnd.Next());
168 }
169 return rv;
170 }
171
172 // Callbacks for secondary cache
SizeFn(void *)173 size_t SizeFn(void* /*obj*/) { return FLAGS_value_bytes; }
174
SaveToFn(void * obj,size_t,size_t size,void * out)175 Status SaveToFn(void* obj, size_t /*offset*/, size_t size, void* out) {
176 memcpy(out, obj, size);
177 return Status::OK();
178 }
179
180 // Different deleters to simulate using deleter to gather
181 // stats on the code origin and kind of cache entries.
deleter1(const Slice &,void * value)182 void deleter1(const Slice& /*key*/, void* value) {
183 delete[] static_cast<char*>(value);
184 }
deleter2(const Slice &,void * value)185 void deleter2(const Slice& /*key*/, void* value) {
186 delete[] static_cast<char*>(value);
187 }
deleter3(const Slice &,void * value)188 void deleter3(const Slice& /*key*/, void* value) {
189 delete[] static_cast<char*>(value);
190 }
191
192 Cache::CacheItemHelper helper1(SizeFn, SaveToFn, deleter1);
193 Cache::CacheItemHelper helper2(SizeFn, SaveToFn, deleter2);
194 Cache::CacheItemHelper helper3(SizeFn, SaveToFn, deleter3);
195 } // namespace
196
197 class CacheBench {
198 static constexpr uint64_t kHundredthUint64 =
199 std::numeric_limits<uint64_t>::max() / 100U;
200
201 public:
CacheBench()202 CacheBench()
203 : max_key_(static_cast<uint64_t>(FLAGS_cache_size / FLAGS_resident_ratio /
204 FLAGS_value_bytes)),
205 lookup_insert_threshold_(kHundredthUint64 *
206 FLAGS_lookup_insert_percent),
207 insert_threshold_(lookup_insert_threshold_ +
208 kHundredthUint64 * FLAGS_insert_percent),
209 lookup_threshold_(insert_threshold_ +
210 kHundredthUint64 * FLAGS_lookup_percent),
211 erase_threshold_(lookup_threshold_ +
212 kHundredthUint64 * FLAGS_erase_percent),
213 skewed_(FLAGS_skewed) {
214 if (erase_threshold_ != 100U * kHundredthUint64) {
215 fprintf(stderr, "Percentages must add to 100.\n");
216 exit(1);
217 }
218
219 max_log_ = 0;
220 if (skewed_) {
221 uint64_t max_key = max_key_;
222 while (max_key >>= 1) max_log_++;
223 if (max_key > (1u << max_log_)) max_log_++;
224 }
225
226 if (FLAGS_use_clock_cache) {
227 cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits);
228 if (!cache_) {
229 fprintf(stderr, "Clock cache not supported.\n");
230 exit(1);
231 }
232 } else {
233 LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5);
234 #ifndef ROCKSDB_LITE
235 if (!FLAGS_secondary_cache_uri.empty()) {
236 Status s =
237 ObjectRegistry::NewInstance()->NewSharedObject<SecondaryCache>(
238 FLAGS_secondary_cache_uri, &secondary_cache);
239 if (secondary_cache == nullptr) {
240 fprintf(
241 stderr,
242 "No secondary cache registered matching string: %s status=%s\n",
243 FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
244 exit(1);
245 }
246 opts.secondary_cache = secondary_cache;
247 }
248 #endif // ROCKSDB_LITE
249
250 cache_ = NewLRUCache(opts);
251 }
252 }
253
~CacheBench()254 ~CacheBench() {}
255
PopulateCache()256 void PopulateCache() {
257 Random64 rnd(1);
258 KeyGen keygen;
259 for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) {
260 cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd),
261 &helper1, FLAGS_value_bytes);
262 }
263 }
264
Run()265 bool Run() {
266 const auto clock = SystemClock::Default().get();
267
268 PrintEnv();
269 SharedState shared(this);
270 std::vector<std::unique_ptr<ThreadState> > threads(FLAGS_threads);
271 for (uint32_t i = 0; i < FLAGS_threads; i++) {
272 threads[i].reset(new ThreadState(i, &shared));
273 std::thread(ThreadBody, threads[i].get()).detach();
274 }
275
276 HistogramImpl stats_hist;
277 std::string stats_report;
278 std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report);
279
280 uint64_t start_time;
281 {
282 MutexLock l(shared.GetMutex());
283 while (!shared.AllInitialized()) {
284 shared.GetCondVar()->Wait();
285 }
286 // Record start time
287 start_time = clock->NowMicros();
288
289 // Start all threads
290 shared.SetStart();
291 shared.GetCondVar()->SignalAll();
292
293 // Wait threads to complete
294 while (!shared.AllDone()) {
295 shared.GetCondVar()->Wait();
296 }
297 }
298
299 // Stats gathering is considered background work. This time measurement
300 // is for foreground work, and not really ideal for that. See below.
301 uint64_t end_time = clock->NowMicros();
302 stats_thread.join();
303
304 // Wall clock time - includes idle time if threads
305 // finish at different times (not ideal).
306 double elapsed_secs = static_cast<double>(end_time - start_time) * 1e-6;
307 uint32_t ops_per_sec = static_cast<uint32_t>(
308 1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs);
309 printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs,
310 ops_per_sec);
311
312 // Total time in each thread (more accurate throughput measure)
313 elapsed_secs = 0;
314 for (uint32_t i = 0; i < FLAGS_threads; i++) {
315 elapsed_secs += threads[i]->duration_us * 1e-6;
316 }
317 ops_per_sec = static_cast<uint32_t>(1.0 * FLAGS_threads *
318 FLAGS_ops_per_thread / elapsed_secs);
319 printf("Thread ops/sec = %u\n", ops_per_sec);
320
321 printf("\nOperation latency (ns):\n");
322 HistogramImpl combined;
323 for (uint32_t i = 0; i < FLAGS_threads; i++) {
324 combined.Merge(threads[i]->latency_ns_hist);
325 }
326 printf("%s", combined.ToString().c_str());
327
328 if (FLAGS_gather_stats) {
329 printf("\nGather stats latency (us):\n");
330 printf("%s", stats_hist.ToString().c_str());
331 }
332
333 printf("\n%s", stats_report.c_str());
334
335 return true;
336 }
337
338 private:
339 std::shared_ptr<Cache> cache_;
340 const uint64_t max_key_;
341 // Cumulative thresholds in the space of a random uint64_t
342 const uint64_t lookup_insert_threshold_;
343 const uint64_t insert_threshold_;
344 const uint64_t lookup_threshold_;
345 const uint64_t erase_threshold_;
346 const bool skewed_;
347 int max_log_;
348
349 // A benchmark version of gathering stats on an active block cache by
350 // iterating over it. The primary purpose is to measure the impact of
351 // gathering stats with ApplyToAllEntries on throughput- and
352 // latency-sensitive Cache users. Performance of stats gathering is
353 // also reported. The last set of gathered stats is also reported, for
354 // manual sanity checking for logical errors or other unexpected
355 // behavior of cache_bench or the underlying Cache.
StatsBody(SharedState * shared,HistogramImpl * stats_hist,std::string * stats_report)356 static void StatsBody(SharedState* shared, HistogramImpl* stats_hist,
357 std::string* stats_report) {
358 if (!FLAGS_gather_stats) {
359 return;
360 }
361 const auto clock = SystemClock::Default().get();
362 uint64_t total_key_size = 0;
363 uint64_t total_charge = 0;
364 uint64_t total_entry_count = 0;
365 std::set<Cache::DeleterFn> deleters;
366 StopWatchNano timer(clock);
367
368 for (;;) {
369 uint64_t time;
370 time = clock->NowMicros();
371 uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000;
372
373 {
374 MutexLock l(shared->GetMutex());
375 for (;;) {
376 if (shared->AllDone()) {
377 std::ostringstream ostr;
378 ostr << "Most recent cache entry stats:\n"
379 << "Number of entries: " << total_entry_count << "\n"
380 << "Total charge: " << BytesToHumanString(total_charge) << "\n"
381 << "Average key size: "
382 << (1.0 * total_key_size / total_entry_count) << "\n"
383 << "Average charge: "
384 << BytesToHumanString(1.0 * total_charge / total_entry_count)
385 << "\n"
386 << "Unique deleters: " << deleters.size() << "\n";
387 *stats_report = ostr.str();
388 return;
389 }
390 if (clock->NowMicros() >= deadline) {
391 break;
392 }
393 uint64_t diff = deadline - std::min(clock->NowMicros(), deadline);
394 shared->GetCondVar()->TimedWait(diff + 1);
395 }
396 }
397
398 // Now gather stats, outside of mutex
399 total_key_size = 0;
400 total_charge = 0;
401 total_entry_count = 0;
402 deleters.clear();
403 auto fn = [&](const Slice& key, void* /*value*/, size_t charge,
404 Cache::DeleterFn deleter) {
405 total_key_size += key.size();
406 total_charge += charge;
407 ++total_entry_count;
408 // Something slightly more expensive as in (future) stats by category
409 deleters.insert(deleter);
410 };
411 timer.Start();
412 Cache::ApplyToAllEntriesOptions opts;
413 opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock;
414 shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts);
415 stats_hist->Add(timer.ElapsedNanos() / 1000);
416 }
417 }
418
ThreadBody(ThreadState * thread)419 static void ThreadBody(ThreadState* thread) {
420 SharedState* shared = thread->shared;
421
422 {
423 MutexLock l(shared->GetMutex());
424 shared->IncInitialized();
425 if (shared->AllInitialized()) {
426 shared->GetCondVar()->SignalAll();
427 }
428 while (!shared->Started()) {
429 shared->GetCondVar()->Wait();
430 }
431 }
432 thread->shared->GetCacheBench()->OperateCache(thread);
433
434 {
435 MutexLock l(shared->GetMutex());
436 shared->IncDone();
437 if (shared->AllDone()) {
438 shared->GetCondVar()->SignalAll();
439 }
440 }
441 }
442
OperateCache(ThreadState * thread)443 void OperateCache(ThreadState* thread) {
444 // To use looked-up values
445 uint64_t result = 0;
446 // To hold handles for a non-trivial amount of time
447 Cache::Handle* handle = nullptr;
448 KeyGen gen;
449 const auto clock = SystemClock::Default().get();
450 uint64_t start_time = clock->NowMicros();
451 StopWatchNano timer(clock);
452
453 for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
454 timer.Start();
455 Slice key = gen.GetRand(thread->rnd, max_key_, max_log_);
456 uint64_t random_op = thread->rnd.Next();
457 Cache::CreateCallback create_cb =
458 [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
459 *out_obj = reinterpret_cast<void*>(new char[size]);
460 memcpy(*out_obj, buf, size);
461 *charge = size;
462 return Status::OK();
463 };
464
465 if (random_op < lookup_insert_threshold_) {
466 if (handle) {
467 cache_->Release(handle);
468 handle = nullptr;
469 }
470 // do lookup
471 handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
472 true);
473 if (handle) {
474 // do something with the data
475 result += NPHash64(static_cast<char*>(cache_->Value(handle)),
476 FLAGS_value_bytes);
477 } else {
478 // do insert
479 cache_->Insert(key, createValue(thread->rnd), &helper2,
480 FLAGS_value_bytes, &handle);
481 }
482 } else if (random_op < insert_threshold_) {
483 if (handle) {
484 cache_->Release(handle);
485 handle = nullptr;
486 }
487 // do insert
488 cache_->Insert(key, createValue(thread->rnd), &helper3,
489 FLAGS_value_bytes, &handle);
490 } else if (random_op < lookup_threshold_) {
491 if (handle) {
492 cache_->Release(handle);
493 handle = nullptr;
494 }
495 // do lookup
496 handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
497 true);
498 if (handle) {
499 // do something with the data
500 result += NPHash64(static_cast<char*>(cache_->Value(handle)),
501 FLAGS_value_bytes);
502 }
503 } else if (random_op < erase_threshold_) {
504 // do erase
505 cache_->Erase(key);
506 } else {
507 // Should be extremely unlikely (noop)
508 assert(random_op >= kHundredthUint64 * 100U);
509 }
510 thread->latency_ns_hist.Add(timer.ElapsedNanos());
511 }
512 if (handle) {
513 cache_->Release(handle);
514 handle = nullptr;
515 }
516 // Ensure computations on `result` are not optimized away.
517 if (result == 1) {
518 printf("You are extremely unlucky(2). Try again.\n");
519 exit(1);
520 }
521 thread->duration_us = clock->NowMicros() - start_time;
522 }
523
PrintEnv() const524 void PrintEnv() const {
525 printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion);
526 printf("Number of threads : %u\n", FLAGS_threads);
527 printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread);
528 printf("Cache size : %s\n",
529 BytesToHumanString(FLAGS_cache_size).c_str());
530 printf("Num shard bits : %u\n", FLAGS_num_shard_bits);
531 printf("Max key : %" PRIu64 "\n", max_key_);
532 printf("Resident ratio : %g\n", FLAGS_resident_ratio);
533 printf("Skew degree : %u\n", FLAGS_skew);
534 printf("Populate cache : %d\n", int{FLAGS_populate_cache});
535 printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent);
536 printf("Insert percentage : %u%%\n", FLAGS_insert_percent);
537 printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent);
538 printf("Erase percentage : %u%%\n", FLAGS_erase_percent);
539 std::ostringstream stats;
540 if (FLAGS_gather_stats) {
541 stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, "
542 << FLAGS_gather_stats_entries_per_lock << "/lock)";
543 } else {
544 stats << "disabled";
545 }
546 printf("Gather stats : %s\n", stats.str().c_str());
547 printf("----------------------------\n");
548 }
549 };
550
cache_bench_tool(int argc,char ** argv)551 int cache_bench_tool(int argc, char** argv) {
552 ParseCommandLineFlags(&argc, &argv, true);
553
554 if (FLAGS_threads <= 0) {
555 fprintf(stderr, "threads number <= 0\n");
556 exit(1);
557 }
558
559 ROCKSDB_NAMESPACE::CacheBench bench;
560 if (FLAGS_populate_cache) {
561 bench.PopulateCache();
562 printf("Population complete\n");
563 printf("----------------------------\n");
564 }
565 if (bench.Run()) {
566 return 0;
567 } else {
568 return 1;
569 }
570 } // namespace ROCKSDB_NAMESPACE
571 } // namespace ROCKSDB_NAMESPACE
572
573 #endif // GFLAGS
574