1 // Copyright (c) 2013, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 #pragma once 10 11 #ifndef ROCKSDB_LITE 12 13 #include <functional> 14 #include <limits> 15 #include <list> 16 #include <memory> 17 #include <string> 18 #include <thread> 19 #include <vector> 20 21 #include "db/db_test_util.h" 22 #include "memory/arena.h" 23 #include "port/port.h" 24 #include "rocksdb/cache.h" 25 #include "table/block_based/block_builder.h" 26 #include "test_util/testharness.h" 27 #include "util/random.h" 28 #include "utilities/persistent_cache/volatile_tier_impl.h" 29 30 namespace ROCKSDB_NAMESPACE { 31 32 // 33 // Unit tests for testing PersistentCacheTier 34 // 35 class PersistentCacheTierTest : public testing::Test { 36 public: 37 PersistentCacheTierTest(); ~PersistentCacheTierTest()38 virtual ~PersistentCacheTierTest() { 39 if (cache_) { 40 Status s = cache_->Close(); 41 assert(s.ok()); 42 } 43 } 44 45 protected: 46 // Flush cache Flush()47 void Flush() { 48 if (cache_) { 49 cache_->TEST_Flush(); 50 } 51 } 52 53 // create threaded workload 54 template <class T> SpawnThreads(const size_t n,const T & fn)55 std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) { 56 std::list<port::Thread> threads; 57 for (size_t i = 0; i < n; i++) { 58 port::Thread th(fn); 59 threads.push_back(std::move(th)); 60 } 61 return threads; 62 } 63 64 // Wait for threads to join Join(std::list<port::Thread> && threads)65 void Join(std::list<port::Thread>&& threads) { 66 for (auto& th : threads) { 67 th.join(); 68 } 69 threads.clear(); 70 } 71 72 // Run insert workload in threads Insert(const size_t nthreads,const size_t max_keys)73 void Insert(const size_t nthreads, const size_t max_keys) { 74 key_ = 0; 75 max_keys_ = max_keys; 76 // spawn threads 77 auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this); 78 auto threads = SpawnThreads(nthreads, fn); 79 // join with threads 80 Join(std::move(threads)); 81 // Flush cache 82 Flush(); 83 } 84 85 // Run verification on the cache 86 void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) { 87 stats_verify_hits_ = 0; 88 stats_verify_missed_ = 0; 89 key_ = 0; 90 // spawn threads 91 auto fn = 92 std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled); 93 auto threads = SpawnThreads(nthreads, fn); 94 // join with threads 95 Join(std::move(threads)); 96 } 97 98 // pad 0 to numbers PaddedNumber(const size_t data,const size_t pad_size)99 std::string PaddedNumber(const size_t data, const size_t pad_size) { 100 assert(pad_size); 101 char* ret = new char[pad_size]; 102 int pos = static_cast<int>(pad_size) - 1; 103 size_t count = 0; 104 size_t t = data; 105 // copy numbers 106 while (t) { 107 count++; 108 ret[pos--] = '0' + t % 10; 109 t = t / 10; 110 } 111 // copy 0s 112 while (pos >= 0) { 113 ret[pos--] = '0'; 114 } 115 // post condition 116 assert(count <= pad_size); 117 assert(pos == -1); 118 std::string result(ret, pad_size); 119 delete[] ret; 120 return result; 121 } 122 123 // Insert workload implementation InsertImpl()124 void InsertImpl() { 125 const std::string prefix = "key_prefix_"; 126 127 while (true) { 128 size_t i = key_++; 129 if (i >= max_keys_) { 130 break; 131 } 132 133 char data[4 * 1024]; 134 memset(data, '0' + (i % 10), sizeof(data)); 135 auto k = prefix + PaddedNumber(i, /*count=*/8); 136 Slice key(k); 137 while (true) { 138 Status status = cache_->Insert(key, data, sizeof(data)); 139 if (status.ok()) { 140 break; 141 } 142 ASSERT_TRUE(status.IsTryAgain()); 143 Env::Default()->SleepForMicroseconds(1 * 1000 * 1000); 144 } 145 } 146 } 147 148 // Verification implementation 149 void VerifyImpl(const bool eviction_enabled = false) { 150 const std::string prefix = "key_prefix_"; 151 while (true) { 152 size_t i = key_++; 153 if (i >= max_keys_) { 154 break; 155 } 156 157 char edata[4 * 1024]; 158 memset(edata, '0' + (i % 10), sizeof(edata)); 159 auto k = prefix + PaddedNumber(i, /*count=*/8); 160 Slice key(k); 161 std::unique_ptr<char[]> block; 162 size_t block_size; 163 164 if (eviction_enabled) { 165 if (!cache_->Lookup(key, &block, &block_size).ok()) { 166 // assume that the key is evicted 167 stats_verify_missed_++; 168 continue; 169 } 170 } 171 172 ASSERT_OK(cache_->Lookup(key, &block, &block_size)); 173 ASSERT_EQ(block_size, sizeof(edata)); 174 ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0); 175 stats_verify_hits_++; 176 } 177 } 178 179 // template for insert test RunInsertTest(const size_t nthreads,const size_t max_keys)180 void RunInsertTest(const size_t nthreads, const size_t max_keys) { 181 Insert(nthreads, max_keys); 182 Verify(nthreads); 183 ASSERT_EQ(stats_verify_hits_, max_keys); 184 ASSERT_EQ(stats_verify_missed_, 0); 185 186 ASSERT_OK(cache_->Close()); 187 cache_.reset(); 188 } 189 190 // template for negative insert test RunNegativeInsertTest(const size_t nthreads,const size_t max_keys)191 void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) { 192 Insert(nthreads, max_keys); 193 Verify(nthreads, /*eviction_enabled=*/true); 194 ASSERT_LT(stats_verify_hits_, max_keys); 195 ASSERT_GT(stats_verify_missed_, 0); 196 197 ASSERT_OK(cache_->Close()); 198 cache_.reset(); 199 } 200 201 // template for insert with eviction test RunInsertTestWithEviction(const size_t nthreads,const size_t max_keys)202 void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) { 203 Insert(nthreads, max_keys); 204 Verify(nthreads, /*eviction_enabled=*/true); 205 ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys); 206 ASSERT_GT(stats_verify_hits_, 0); 207 ASSERT_GT(stats_verify_missed_, 0); 208 209 ASSERT_OK(cache_->Close()); 210 cache_.reset(); 211 } 212 213 const std::string path_; 214 std::shared_ptr<Logger> log_; 215 std::shared_ptr<PersistentCacheTier> cache_; 216 std::atomic<size_t> key_{0}; 217 size_t max_keys_ = 0; 218 std::atomic<size_t> stats_verify_hits_{0}; 219 std::atomic<size_t> stats_verify_missed_{0}; 220 }; 221 222 // 223 // RocksDB tests 224 // 225 class PersistentCacheDBTest : public DBTestBase { 226 public: 227 PersistentCacheDBTest(); 228 TestGetTickerCount(const Options & options,Tickers ticker_type)229 static uint64_t TestGetTickerCount(const Options& options, 230 Tickers ticker_type) { 231 return static_cast<uint32_t>( 232 options.statistics->getTickerCount(ticker_type)); 233 } 234 235 // insert data to table Insert(const Options & options,const BlockBasedTableOptions &,const int num_iter,std::vector<std::string> * values)236 void Insert(const Options& options, 237 const BlockBasedTableOptions& /*table_options*/, 238 const int num_iter, std::vector<std::string>* values) { 239 CreateAndReopenWithCF({"pikachu"}, options); 240 // default column family doesn't have block cache 241 Options no_block_cache_opts; 242 no_block_cache_opts.statistics = options.statistics; 243 no_block_cache_opts = CurrentOptions(no_block_cache_opts); 244 BlockBasedTableOptions table_options_no_bc; 245 table_options_no_bc.no_block_cache = true; 246 no_block_cache_opts.table_factory.reset( 247 NewBlockBasedTableFactory(table_options_no_bc)); 248 ReopenWithColumnFamilies( 249 {"default", "pikachu"}, 250 std::vector<Options>({no_block_cache_opts, options})); 251 252 Random rnd(301); 253 254 // Write 8MB (80 values, each 100K) 255 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); 256 std::string str; 257 for (int i = 0; i < num_iter; i++) { 258 if (i % 4 == 0) { // high compression ratio 259 str = rnd.RandomString(1000); 260 } 261 values->push_back(str); 262 ASSERT_OK(Put(1, Key(i), (*values)[i])); 263 } 264 265 // flush all data from memtable so that reads are from block cache 266 ASSERT_OK(Flush(1)); 267 } 268 269 // verify data Verify(const int num_iter,const std::vector<std::string> & values)270 void Verify(const int num_iter, const std::vector<std::string>& values) { 271 for (int j = 0; j < 2; ++j) { 272 for (int i = 0; i < num_iter; i++) { 273 ASSERT_EQ(Get(1, Key(i)), values[i]); 274 } 275 } 276 } 277 278 // test template 279 void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>& 280 new_pcache, 281 const size_t max_keys, const size_t max_usecase); 282 }; 283 284 } // namespace ROCKSDB_NAMESPACE 285 286 #endif 287