1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 
11 #ifndef ROCKSDB_LITE
12 
13 #include <functional>
14 #include <limits>
15 #include <list>
16 #include <memory>
17 #include <string>
18 #include <thread>
19 #include <vector>
20 
21 #include "db/db_test_util.h"
22 #include "memory/arena.h"
23 #include "port/port.h"
24 #include "rocksdb/cache.h"
25 #include "table/block_based/block_builder.h"
26 #include "test_util/testharness.h"
27 #include "util/random.h"
28 #include "utilities/persistent_cache/volatile_tier_impl.h"
29 
30 namespace ROCKSDB_NAMESPACE {
31 
32 //
33 // Unit tests for testing PersistentCacheTier
34 //
35 class PersistentCacheTierTest : public testing::Test {
36  public:
37   PersistentCacheTierTest();
~PersistentCacheTierTest()38   virtual ~PersistentCacheTierTest() {
39     if (cache_) {
40       Status s = cache_->Close();
41       assert(s.ok());
42     }
43   }
44 
45  protected:
46   // Flush cache
Flush()47   void Flush() {
48     if (cache_) {
49       cache_->TEST_Flush();
50     }
51   }
52 
53   // create threaded workload
54   template <class T>
SpawnThreads(const size_t n,const T & fn)55   std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) {
56     std::list<port::Thread> threads;
57     for (size_t i = 0; i < n; i++) {
58       port::Thread th(fn);
59       threads.push_back(std::move(th));
60     }
61     return threads;
62   }
63 
64   // Wait for threads to join
Join(std::list<port::Thread> && threads)65   void Join(std::list<port::Thread>&& threads) {
66     for (auto& th : threads) {
67       th.join();
68     }
69     threads.clear();
70   }
71 
72   // Run insert workload in threads
Insert(const size_t nthreads,const size_t max_keys)73   void Insert(const size_t nthreads, const size_t max_keys) {
74     key_ = 0;
75     max_keys_ = max_keys;
76     // spawn threads
77     auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this);
78     auto threads = SpawnThreads(nthreads, fn);
79     // join with threads
80     Join(std::move(threads));
81     // Flush cache
82     Flush();
83   }
84 
85   // Run verification on the cache
86   void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) {
87     stats_verify_hits_ = 0;
88     stats_verify_missed_ = 0;
89     key_ = 0;
90     // spawn threads
91     auto fn =
92         std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled);
93     auto threads = SpawnThreads(nthreads, fn);
94     // join with threads
95     Join(std::move(threads));
96   }
97 
98   // pad 0 to numbers
PaddedNumber(const size_t data,const size_t pad_size)99   std::string PaddedNumber(const size_t data, const size_t pad_size) {
100     assert(pad_size);
101     char* ret = new char[pad_size];
102     int pos = static_cast<int>(pad_size) - 1;
103     size_t count = 0;
104     size_t t = data;
105     // copy numbers
106     while (t) {
107       count++;
108       ret[pos--] = '0' + t % 10;
109       t = t / 10;
110     }
111     // copy 0s
112     while (pos >= 0) {
113       ret[pos--] = '0';
114     }
115     // post condition
116     assert(count <= pad_size);
117     assert(pos == -1);
118     std::string result(ret, pad_size);
119     delete[] ret;
120     return result;
121   }
122 
123   // Insert workload implementation
InsertImpl()124   void InsertImpl() {
125     const std::string prefix = "key_prefix_";
126 
127     while (true) {
128       size_t i = key_++;
129       if (i >= max_keys_) {
130         break;
131       }
132 
133       char data[4 * 1024];
134       memset(data, '0' + (i % 10), sizeof(data));
135       auto k = prefix + PaddedNumber(i, /*count=*/8);
136       Slice key(k);
137       while (true) {
138         Status status = cache_->Insert(key, data, sizeof(data));
139         if (status.ok()) {
140           break;
141         }
142         ASSERT_TRUE(status.IsTryAgain());
143         Env::Default()->SleepForMicroseconds(1 * 1000 * 1000);
144       }
145     }
146   }
147 
148   // Verification implementation
149   void VerifyImpl(const bool eviction_enabled = false) {
150     const std::string prefix = "key_prefix_";
151     while (true) {
152       size_t i = key_++;
153       if (i >= max_keys_) {
154         break;
155       }
156 
157       char edata[4 * 1024];
158       memset(edata, '0' + (i % 10), sizeof(edata));
159       auto k = prefix + PaddedNumber(i, /*count=*/8);
160       Slice key(k);
161       std::unique_ptr<char[]> block;
162       size_t block_size;
163 
164       if (eviction_enabled) {
165         if (!cache_->Lookup(key, &block, &block_size).ok()) {
166           // assume that the key is evicted
167           stats_verify_missed_++;
168           continue;
169         }
170       }
171 
172       ASSERT_OK(cache_->Lookup(key, &block, &block_size));
173       ASSERT_EQ(block_size, sizeof(edata));
174       ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0);
175       stats_verify_hits_++;
176     }
177   }
178 
179   // template for insert test
RunInsertTest(const size_t nthreads,const size_t max_keys)180   void RunInsertTest(const size_t nthreads, const size_t max_keys) {
181     Insert(nthreads, max_keys);
182     Verify(nthreads);
183     ASSERT_EQ(stats_verify_hits_, max_keys);
184     ASSERT_EQ(stats_verify_missed_, 0);
185 
186     ASSERT_OK(cache_->Close());
187     cache_.reset();
188   }
189 
190   // template for negative insert test
RunNegativeInsertTest(const size_t nthreads,const size_t max_keys)191   void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) {
192     Insert(nthreads, max_keys);
193     Verify(nthreads, /*eviction_enabled=*/true);
194     ASSERT_LT(stats_verify_hits_, max_keys);
195     ASSERT_GT(stats_verify_missed_, 0);
196 
197     ASSERT_OK(cache_->Close());
198     cache_.reset();
199   }
200 
201   // template for insert with eviction test
RunInsertTestWithEviction(const size_t nthreads,const size_t max_keys)202   void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) {
203     Insert(nthreads, max_keys);
204     Verify(nthreads, /*eviction_enabled=*/true);
205     ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys);
206     ASSERT_GT(stats_verify_hits_, 0);
207     ASSERT_GT(stats_verify_missed_, 0);
208 
209     ASSERT_OK(cache_->Close());
210     cache_.reset();
211   }
212 
213   const std::string path_;
214   std::shared_ptr<Logger> log_;
215   std::shared_ptr<PersistentCacheTier> cache_;
216   std::atomic<size_t> key_{0};
217   size_t max_keys_ = 0;
218   std::atomic<size_t> stats_verify_hits_{0};
219   std::atomic<size_t> stats_verify_missed_{0};
220 };
221 
222 //
223 // RocksDB tests
224 //
225 class PersistentCacheDBTest : public DBTestBase {
226  public:
227   PersistentCacheDBTest();
228 
TestGetTickerCount(const Options & options,Tickers ticker_type)229   static uint64_t TestGetTickerCount(const Options& options,
230                                      Tickers ticker_type) {
231     return static_cast<uint32_t>(
232         options.statistics->getTickerCount(ticker_type));
233   }
234 
235   // insert data to table
Insert(const Options & options,const BlockBasedTableOptions &,const int num_iter,std::vector<std::string> * values)236   void Insert(const Options& options,
237               const BlockBasedTableOptions& /*table_options*/,
238               const int num_iter, std::vector<std::string>* values) {
239     CreateAndReopenWithCF({"pikachu"}, options);
240     // default column family doesn't have block cache
241     Options no_block_cache_opts;
242     no_block_cache_opts.statistics = options.statistics;
243     no_block_cache_opts = CurrentOptions(no_block_cache_opts);
244     BlockBasedTableOptions table_options_no_bc;
245     table_options_no_bc.no_block_cache = true;
246     no_block_cache_opts.table_factory.reset(
247         NewBlockBasedTableFactory(table_options_no_bc));
248     ReopenWithColumnFamilies(
249         {"default", "pikachu"},
250         std::vector<Options>({no_block_cache_opts, options}));
251 
252     Random rnd(301);
253 
254     // Write 8MB (80 values, each 100K)
255     ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
256     std::string str;
257     for (int i = 0; i < num_iter; i++) {
258       if (i % 4 == 0) {  // high compression ratio
259         str = rnd.RandomString(1000);
260       }
261       values->push_back(str);
262       ASSERT_OK(Put(1, Key(i), (*values)[i]));
263     }
264 
265     // flush all data from memtable so that reads are from block cache
266     ASSERT_OK(Flush(1));
267   }
268 
269   // verify data
Verify(const int num_iter,const std::vector<std::string> & values)270   void Verify(const int num_iter, const std::vector<std::string>& values) {
271     for (int j = 0; j < 2; ++j) {
272       for (int i = 0; i < num_iter; i++) {
273         ASSERT_EQ(Get(1, Key(i)), values[i]);
274       }
275     }
276   }
277 
278   // test template
279   void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>&
280                    new_pcache,
281                const size_t max_keys, const size_t max_usecase);
282 };
283 
284 }  // namespace ROCKSDB_NAMESPACE
285 
286 #endif
287