1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifdef GFLAGS
7 #include <cinttypes>
8 #include <cstdio>
9 #include <limits>
10 #include <set>
11 #include <sstream>
12
13 #include "monitoring/histogram.h"
14 #include "port/port.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/convenience.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/secondary_cache.h"
20 #include "rocksdb/system_clock.h"
21 #include "table/block_based/cachable_entry.h"
22 #include "util/coding.h"
23 #include "util/gflags_compat.h"
24 #include "util/hash.h"
25 #include "util/mutexlock.h"
26 #include "util/random.h"
27 #include "util/stop_watch.h"
28 #include "util/string_util.h"
29
30 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
31
32 static constexpr uint32_t KiB = uint32_t{1} << 10;
33 static constexpr uint32_t MiB = KiB << 10;
34 static constexpr uint64_t GiB = MiB << 10;
35
36 DEFINE_uint32(threads, 16, "Number of concurrent threads to run.");
37 DEFINE_uint64(cache_size, 1 * GiB,
38 "Number of bytes to use as a cache of uncompressed data.");
39 DEFINE_uint32(num_shard_bits, 6, "shard_bits.");
40
41 DEFINE_double(resident_ratio, 0.25,
42 "Ratio of keys fitting in cache to keyspace.");
43 DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread.");
44 DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added.");
45
46 DEFINE_uint32(skew, 5, "Degree of skew in key selection");
47 DEFINE_bool(populate_cache, true, "Populate cache before operations");
48
49 DEFINE_uint32(lookup_insert_percent, 87,
50 "Ratio of lookup (+ insert on not found) to total workload "
51 "(expressed as a percentage)");
52 DEFINE_uint32(insert_percent, 2,
53 "Ratio of insert to total workload (expressed as a percentage)");
54 DEFINE_uint32(lookup_percent, 10,
55 "Ratio of lookup to total workload (expressed as a percentage)");
56 DEFINE_uint32(erase_percent, 1,
57 "Ratio of erase to total workload (expressed as a percentage)");
58 DEFINE_bool(gather_stats, false,
59 "Whether to periodically simulate gathering block cache stats, "
60 "using one more thread.");
61 DEFINE_uint32(
62 gather_stats_sleep_ms, 1000,
63 "How many milliseconds to sleep between each gathering of stats.");
64
65 DEFINE_uint32(gather_stats_entries_per_lock, 256,
66 "For Cache::ApplyToAllEntries");
67 DEFINE_bool(skewed, false, "If true, skew the key access distribution");
68 #ifndef ROCKSDB_LITE
69 DEFINE_string(secondary_cache_uri, "",
70 "Full URI for creating a custom secondary cache object");
71 static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;
72 #endif // ROCKSDB_LITE
73
74 DEFINE_bool(use_clock_cache, false, "");
75
76 namespace ROCKSDB_NAMESPACE {
77
78 class CacheBench;
79 namespace {
80 // State shared by all concurrent executions of the same benchmark.
81 class SharedState {
82 public:
SharedState(CacheBench * cache_bench)83 explicit SharedState(CacheBench* cache_bench)
84 : cv_(&mu_),
85 num_initialized_(0),
86 start_(false),
87 num_done_(0),
88 cache_bench_(cache_bench) {}
89
~SharedState()90 ~SharedState() {}
91
GetMutex()92 port::Mutex* GetMutex() { return &mu_; }
93
GetCondVar()94 port::CondVar* GetCondVar() { return &cv_; }
95
GetCacheBench() const96 CacheBench* GetCacheBench() const { return cache_bench_; }
97
IncInitialized()98 void IncInitialized() { num_initialized_++; }
99
IncDone()100 void IncDone() { num_done_++; }
101
AllInitialized() const102 bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; }
103
AllDone() const104 bool AllDone() const { return num_done_ >= FLAGS_threads; }
105
SetStart()106 void SetStart() { start_ = true; }
107
Started() const108 bool Started() const { return start_; }
109
110 private:
111 port::Mutex mu_;
112 port::CondVar cv_;
113
114 uint64_t num_initialized_;
115 bool start_;
116 uint64_t num_done_;
117
118 CacheBench* cache_bench_;
119 };
120
121 // Per-thread state for concurrent executions of the same benchmark.
122 struct ThreadState {
123 uint32_t tid;
124 Random64 rnd;
125 SharedState* shared;
126 HistogramImpl latency_ns_hist;
127 uint64_t duration_us = 0;
128
ThreadStateROCKSDB_NAMESPACE::__anone0cec4ae0111::ThreadState129 ThreadState(uint32_t index, SharedState* _shared)
130 : tid(index), rnd(1000 + index), shared(_shared) {}
131 };
132
133 struct KeyGen {
134 char key_data[27];
135
GetRandROCKSDB_NAMESPACE::__anone0cec4ae0111::KeyGen136 Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) {
137 uint64_t key = 0;
138 if (!FLAGS_skewed) {
139 uint64_t raw = rnd.Next();
140 // Skew according to setting
141 for (uint32_t i = 0; i < FLAGS_skew; ++i) {
142 raw = std::min(raw, rnd.Next());
143 }
144 key = FastRange64(raw, max_key);
145 } else {
146 key = rnd.Skewed(max_log);
147 if (key > max_key) {
148 key -= max_key;
149 }
150 }
151 // Variable size and alignment
152 size_t off = key % 8;
153 key_data[0] = char{42};
154 EncodeFixed64(key_data + 1, key);
155 key_data[9] = char{11};
156 EncodeFixed64(key_data + 10, key);
157 key_data[18] = char{4};
158 EncodeFixed64(key_data + 19, key);
159 return Slice(&key_data[off], sizeof(key_data) - off);
160 }
161 };
162
createValue(Random64 & rnd)163 char* createValue(Random64& rnd) {
164 char* rv = new char[FLAGS_value_bytes];
165 // Fill with some filler data, and take some CPU time
166 for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) {
167 EncodeFixed64(rv + i, rnd.Next());
168 }
169 return rv;
170 }
171
172 // Callbacks for secondary cache
SizeFn(void *)173 size_t SizeFn(void* /*obj*/) { return FLAGS_value_bytes; }
174
SaveToFn(void * obj,size_t,size_t size,void * out)175 Status SaveToFn(void* obj, size_t /*offset*/, size_t size, void* out) {
176 memcpy(out, obj, size);
177 return Status::OK();
178 }
179
180 // Different deleters to simulate using deleter to gather
181 // stats on the code origin and kind of cache entries.
deleter1(const Slice &,void * value)182 void deleter1(const Slice& /*key*/, void* value) {
183 delete[] static_cast<char*>(value);
184 }
deleter2(const Slice &,void * value)185 void deleter2(const Slice& /*key*/, void* value) {
186 delete[] static_cast<char*>(value);
187 }
deleter3(const Slice &,void * value)188 void deleter3(const Slice& /*key*/, void* value) {
189 delete[] static_cast<char*>(value);
190 }
191
192 Cache::CacheItemHelper helper1(SizeFn, SaveToFn, deleter1);
193 Cache::CacheItemHelper helper2(SizeFn, SaveToFn, deleter2);
194 Cache::CacheItemHelper helper3(SizeFn, SaveToFn, deleter3);
195 } // namespace
196
197 class CacheBench {
198 static constexpr uint64_t kHundredthUint64 =
199 std::numeric_limits<uint64_t>::max() / 100U;
200
201 public:
CacheBench()202 CacheBench()
203 : max_key_(static_cast<uint64_t>(FLAGS_cache_size / FLAGS_resident_ratio /
204 FLAGS_value_bytes)),
205 lookup_insert_threshold_(kHundredthUint64 *
206 FLAGS_lookup_insert_percent),
207 insert_threshold_(lookup_insert_threshold_ +
208 kHundredthUint64 * FLAGS_insert_percent),
209 lookup_threshold_(insert_threshold_ +
210 kHundredthUint64 * FLAGS_lookup_percent),
211 erase_threshold_(lookup_threshold_ +
212 kHundredthUint64 * FLAGS_erase_percent),
213 skewed_(FLAGS_skewed) {
214 if (erase_threshold_ != 100U * kHundredthUint64) {
215 fprintf(stderr, "Percentages must add to 100.\n");
216 exit(1);
217 }
218
219 max_log_ = 0;
220 if (skewed_) {
221 uint64_t max_key = max_key_;
222 while (max_key >>= 1) max_log_++;
223 if (max_key > (1u << max_log_)) max_log_++;
224 }
225
226 if (FLAGS_use_clock_cache) {
227 cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits);
228 if (!cache_) {
229 fprintf(stderr, "Clock cache not supported.\n");
230 exit(1);
231 }
232 } else {
233 LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5);
234 #ifndef ROCKSDB_LITE
235 if (!FLAGS_secondary_cache_uri.empty()) {
236 Status s = SecondaryCache::CreateFromString(
237 ConfigOptions(), FLAGS_secondary_cache_uri, &secondary_cache);
238 if (secondary_cache == nullptr) {
239 fprintf(
240 stderr,
241 "No secondary cache registered matching string: %s status=%s\n",
242 FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
243 exit(1);
244 }
245 opts.secondary_cache = secondary_cache;
246 }
247 #endif // ROCKSDB_LITE
248
249 cache_ = NewLRUCache(opts);
250 }
251 }
252
~CacheBench()253 ~CacheBench() {}
254
PopulateCache()255 void PopulateCache() {
256 Random64 rnd(1);
257 KeyGen keygen;
258 for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) {
259 cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd),
260 &helper1, FLAGS_value_bytes);
261 }
262 }
263
Run()264 bool Run() {
265 const auto clock = SystemClock::Default().get();
266
267 PrintEnv();
268 SharedState shared(this);
269 std::vector<std::unique_ptr<ThreadState> > threads(FLAGS_threads);
270 for (uint32_t i = 0; i < FLAGS_threads; i++) {
271 threads[i].reset(new ThreadState(i, &shared));
272 std::thread(ThreadBody, threads[i].get()).detach();
273 }
274
275 HistogramImpl stats_hist;
276 std::string stats_report;
277 std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report);
278
279 uint64_t start_time;
280 {
281 MutexLock l(shared.GetMutex());
282 while (!shared.AllInitialized()) {
283 shared.GetCondVar()->Wait();
284 }
285 // Record start time
286 start_time = clock->NowMicros();
287
288 // Start all threads
289 shared.SetStart();
290 shared.GetCondVar()->SignalAll();
291
292 // Wait threads to complete
293 while (!shared.AllDone()) {
294 shared.GetCondVar()->Wait();
295 }
296 }
297
298 // Stats gathering is considered background work. This time measurement
299 // is for foreground work, and not really ideal for that. See below.
300 uint64_t end_time = clock->NowMicros();
301 stats_thread.join();
302
303 // Wall clock time - includes idle time if threads
304 // finish at different times (not ideal).
305 double elapsed_secs = static_cast<double>(end_time - start_time) * 1e-6;
306 uint32_t ops_per_sec = static_cast<uint32_t>(
307 1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs);
308 printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs,
309 ops_per_sec);
310
311 // Total time in each thread (more accurate throughput measure)
312 elapsed_secs = 0;
313 for (uint32_t i = 0; i < FLAGS_threads; i++) {
314 elapsed_secs += threads[i]->duration_us * 1e-6;
315 }
316 ops_per_sec = static_cast<uint32_t>(1.0 * FLAGS_threads *
317 FLAGS_ops_per_thread / elapsed_secs);
318 printf("Thread ops/sec = %u\n", ops_per_sec);
319
320 printf("\nOperation latency (ns):\n");
321 HistogramImpl combined;
322 for (uint32_t i = 0; i < FLAGS_threads; i++) {
323 combined.Merge(threads[i]->latency_ns_hist);
324 }
325 printf("%s", combined.ToString().c_str());
326
327 if (FLAGS_gather_stats) {
328 printf("\nGather stats latency (us):\n");
329 printf("%s", stats_hist.ToString().c_str());
330 }
331
332 printf("\n%s", stats_report.c_str());
333
334 return true;
335 }
336
337 private:
338 std::shared_ptr<Cache> cache_;
339 const uint64_t max_key_;
340 // Cumulative thresholds in the space of a random uint64_t
341 const uint64_t lookup_insert_threshold_;
342 const uint64_t insert_threshold_;
343 const uint64_t lookup_threshold_;
344 const uint64_t erase_threshold_;
345 const bool skewed_;
346 int max_log_;
347
348 // A benchmark version of gathering stats on an active block cache by
349 // iterating over it. The primary purpose is to measure the impact of
350 // gathering stats with ApplyToAllEntries on throughput- and
351 // latency-sensitive Cache users. Performance of stats gathering is
352 // also reported. The last set of gathered stats is also reported, for
353 // manual sanity checking for logical errors or other unexpected
354 // behavior of cache_bench or the underlying Cache.
StatsBody(SharedState * shared,HistogramImpl * stats_hist,std::string * stats_report)355 static void StatsBody(SharedState* shared, HistogramImpl* stats_hist,
356 std::string* stats_report) {
357 if (!FLAGS_gather_stats) {
358 return;
359 }
360 const auto clock = SystemClock::Default().get();
361 uint64_t total_key_size = 0;
362 uint64_t total_charge = 0;
363 uint64_t total_entry_count = 0;
364 std::set<Cache::DeleterFn> deleters;
365 StopWatchNano timer(clock);
366
367 for (;;) {
368 uint64_t time;
369 time = clock->NowMicros();
370 uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000;
371
372 {
373 MutexLock l(shared->GetMutex());
374 for (;;) {
375 if (shared->AllDone()) {
376 std::ostringstream ostr;
377 ostr << "Most recent cache entry stats:\n"
378 << "Number of entries: " << total_entry_count << "\n"
379 << "Total charge: " << BytesToHumanString(total_charge) << "\n"
380 << "Average key size: "
381 << (1.0 * total_key_size / total_entry_count) << "\n"
382 << "Average charge: "
383 << BytesToHumanString(1.0 * total_charge / total_entry_count)
384 << "\n"
385 << "Unique deleters: " << deleters.size() << "\n";
386 *stats_report = ostr.str();
387 return;
388 }
389 if (clock->NowMicros() >= deadline) {
390 break;
391 }
392 uint64_t diff = deadline - std::min(clock->NowMicros(), deadline);
393 shared->GetCondVar()->TimedWait(diff + 1);
394 }
395 }
396
397 // Now gather stats, outside of mutex
398 total_key_size = 0;
399 total_charge = 0;
400 total_entry_count = 0;
401 deleters.clear();
402 auto fn = [&](const Slice& key, void* /*value*/, size_t charge,
403 Cache::DeleterFn deleter) {
404 total_key_size += key.size();
405 total_charge += charge;
406 ++total_entry_count;
407 // Something slightly more expensive as in (future) stats by category
408 deleters.insert(deleter);
409 };
410 timer.Start();
411 Cache::ApplyToAllEntriesOptions opts;
412 opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock;
413 shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts);
414 stats_hist->Add(timer.ElapsedNanos() / 1000);
415 }
416 }
417
ThreadBody(ThreadState * thread)418 static void ThreadBody(ThreadState* thread) {
419 SharedState* shared = thread->shared;
420
421 {
422 MutexLock l(shared->GetMutex());
423 shared->IncInitialized();
424 if (shared->AllInitialized()) {
425 shared->GetCondVar()->SignalAll();
426 }
427 while (!shared->Started()) {
428 shared->GetCondVar()->Wait();
429 }
430 }
431 thread->shared->GetCacheBench()->OperateCache(thread);
432
433 {
434 MutexLock l(shared->GetMutex());
435 shared->IncDone();
436 if (shared->AllDone()) {
437 shared->GetCondVar()->SignalAll();
438 }
439 }
440 }
441
OperateCache(ThreadState * thread)442 void OperateCache(ThreadState* thread) {
443 // To use looked-up values
444 uint64_t result = 0;
445 // To hold handles for a non-trivial amount of time
446 Cache::Handle* handle = nullptr;
447 KeyGen gen;
448 const auto clock = SystemClock::Default().get();
449 uint64_t start_time = clock->NowMicros();
450 StopWatchNano timer(clock);
451
452 for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
453 timer.Start();
454 Slice key = gen.GetRand(thread->rnd, max_key_, max_log_);
455 uint64_t random_op = thread->rnd.Next();
456 Cache::CreateCallback create_cb =
457 [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
458 *out_obj = reinterpret_cast<void*>(new char[size]);
459 memcpy(*out_obj, buf, size);
460 *charge = size;
461 return Status::OK();
462 };
463
464 if (random_op < lookup_insert_threshold_) {
465 if (handle) {
466 cache_->Release(handle);
467 handle = nullptr;
468 }
469 // do lookup
470 handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
471 true);
472 if (handle) {
473 // do something with the data
474 result += NPHash64(static_cast<char*>(cache_->Value(handle)),
475 FLAGS_value_bytes);
476 } else {
477 // do insert
478 cache_->Insert(key, createValue(thread->rnd), &helper2,
479 FLAGS_value_bytes, &handle);
480 }
481 } else if (random_op < insert_threshold_) {
482 if (handle) {
483 cache_->Release(handle);
484 handle = nullptr;
485 }
486 // do insert
487 cache_->Insert(key, createValue(thread->rnd), &helper3,
488 FLAGS_value_bytes, &handle);
489 } else if (random_op < lookup_threshold_) {
490 if (handle) {
491 cache_->Release(handle);
492 handle = nullptr;
493 }
494 // do lookup
495 handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
496 true);
497 if (handle) {
498 // do something with the data
499 result += NPHash64(static_cast<char*>(cache_->Value(handle)),
500 FLAGS_value_bytes);
501 }
502 } else if (random_op < erase_threshold_) {
503 // do erase
504 cache_->Erase(key);
505 } else {
506 // Should be extremely unlikely (noop)
507 assert(random_op >= kHundredthUint64 * 100U);
508 }
509 thread->latency_ns_hist.Add(timer.ElapsedNanos());
510 }
511 if (handle) {
512 cache_->Release(handle);
513 handle = nullptr;
514 }
515 // Ensure computations on `result` are not optimized away.
516 if (result == 1) {
517 printf("You are extremely unlucky(2). Try again.\n");
518 exit(1);
519 }
520 thread->duration_us = clock->NowMicros() - start_time;
521 }
522
PrintEnv() const523 void PrintEnv() const {
524 printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion);
525 printf("Number of threads : %u\n", FLAGS_threads);
526 printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread);
527 printf("Cache size : %s\n",
528 BytesToHumanString(FLAGS_cache_size).c_str());
529 printf("Num shard bits : %u\n", FLAGS_num_shard_bits);
530 printf("Max key : %" PRIu64 "\n", max_key_);
531 printf("Resident ratio : %g\n", FLAGS_resident_ratio);
532 printf("Skew degree : %u\n", FLAGS_skew);
533 printf("Populate cache : %d\n", int{FLAGS_populate_cache});
534 printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent);
535 printf("Insert percentage : %u%%\n", FLAGS_insert_percent);
536 printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent);
537 printf("Erase percentage : %u%%\n", FLAGS_erase_percent);
538 std::ostringstream stats;
539 if (FLAGS_gather_stats) {
540 stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, "
541 << FLAGS_gather_stats_entries_per_lock << "/lock)";
542 } else {
543 stats << "disabled";
544 }
545 printf("Gather stats : %s\n", stats.str().c_str());
546 printf("----------------------------\n");
547 }
548 };
549
cache_bench_tool(int argc,char ** argv)550 int cache_bench_tool(int argc, char** argv) {
551 ParseCommandLineFlags(&argc, &argv, true);
552
553 if (FLAGS_threads <= 0) {
554 fprintf(stderr, "threads number <= 0\n");
555 exit(1);
556 }
557
558 ROCKSDB_NAMESPACE::CacheBench bench;
559 if (FLAGS_populate_cache) {
560 bench.PopulateCache();
561 printf("Population complete\n");
562 printf("----------------------------\n");
563 }
564 if (bench.Run()) {
565 return 0;
566 } else {
567 return 1;
568 }
569 } // namespace ROCKSDB_NAMESPACE
570 } // namespace ROCKSDB_NAMESPACE
571
572 #endif // GFLAGS
573