1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #if !defined(GFLAGS) || defined(ROCKSDB_LITE)
7 #include <cstdio>
main()8 int main() {
9   fprintf(stderr, "Please install gflags to run rocksdb tools\n");
10   return 1;
11 }
12 #elif defined(OS_MACOSX) || defined(OS_WIN)
13 // Block forward_iterator_bench under MAC and Windows
main()14 int main() { return 0; }
15 #else
16 #include <semaphore.h>
17 #include <atomic>
18 #include <bitset>
19 #include <chrono>
20 #include <climits>
21 #include <condition_variable>
22 #include <limits>
23 #include <mutex>
24 #include <queue>
25 #include <random>
26 #include <thread>
27 
28 #include "port/port.h"
29 #include "rocksdb/cache.h"
30 #include "rocksdb/db.h"
31 #include "rocksdb/status.h"
32 #include "rocksdb/table.h"
33 #include "test_util/testharness.h"
34 #include "util/gflags_compat.h"
35 
36 const int MAX_SHARDS = 100000;
37 
38 DEFINE_int32(writers, 8, "");
39 DEFINE_int32(readers, 8, "");
40 DEFINE_int64(rate, 100000, "");
41 DEFINE_int64(value_size, 300, "");
42 DEFINE_int64(shards, 1000, "");
43 DEFINE_int64(memtable_size, 500000000, "");
44 DEFINE_int64(block_cache_size, 300000000, "");
45 DEFINE_int64(block_size, 65536, "");
46 DEFINE_double(runtime, 300.0, "");
47 DEFINE_bool(cache_only_first, true, "");
48 DEFINE_bool(iterate_upper_bound, true, "");
49 
50 struct Stats {
51   char pad1[128] __attribute__((__unused__));
52   std::atomic<uint64_t> written{0};
53   char pad2[128] __attribute__((__unused__));
54   std::atomic<uint64_t> read{0};
55   std::atomic<uint64_t> cache_misses{0};
56   char pad3[128] __attribute__((__unused__));
57 } stats;
58 
59 struct Key {
KeyKey60   Key() {}
KeyKey61   Key(uint64_t shard_in, uint64_t seqno_in)
62       : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
63 
shardKey64   uint64_t shard() const { return be64toh(shard_be); }
seqnoKey65   uint64_t seqno() const { return be64toh(seqno_be); }
66 
67  private:
68   uint64_t shard_be;
69   uint64_t seqno_be;
70 } __attribute__((__packed__));
71 
72 struct Reader;
73 struct Writer;
74 
75 struct ShardState {
76   char pad1[128] __attribute__((__unused__));
77   std::atomic<uint64_t> last_written{0};
78   Writer* writer;
79   Reader* reader;
80   char pad2[128] __attribute__((__unused__));
81   std::atomic<uint64_t> last_read{0};
82   std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it;
83   std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it_cacheonly;
84   Key upper_bound;
85   ROCKSDB_NAMESPACE::Slice upper_bound_slice;
86   char pad3[128] __attribute__((__unused__));
87 };
88 
89 struct Reader {
90  public:
ReaderReader91   explicit Reader(std::vector<ShardState>* shard_states,
92                   ROCKSDB_NAMESPACE::DB* db)
93       : shard_states_(shard_states), db_(db) {
94     sem_init(&sem_, 0, 0);
95     thread_ = port::Thread(&Reader::run, this);
96   }
97 
runReader98   void run() {
99     while (1) {
100       sem_wait(&sem_);
101       if (done_.load()) {
102         break;
103       }
104 
105       uint64_t shard;
106       {
107         std::lock_guard<std::mutex> guard(queue_mutex_);
108         assert(!shards_pending_queue_.empty());
109         shard = shards_pending_queue_.front();
110         shards_pending_queue_.pop();
111         shards_pending_set_.reset(shard);
112       }
113       readOnceFromShard(shard);
114     }
115   }
116 
readOnceFromShardReader117   void readOnceFromShard(uint64_t shard) {
118     ShardState& state = (*shard_states_)[shard];
119     if (!state.it) {
120       // Initialize iterators
121       ROCKSDB_NAMESPACE::ReadOptions options;
122       options.tailing = true;
123       if (FLAGS_iterate_upper_bound) {
124         state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
125         state.upper_bound_slice = ROCKSDB_NAMESPACE::Slice(
126             (const char*)&state.upper_bound, sizeof(state.upper_bound));
127         options.iterate_upper_bound = &state.upper_bound_slice;
128       }
129 
130       state.it.reset(db_->NewIterator(options));
131 
132       if (FLAGS_cache_only_first) {
133         options.read_tier = ROCKSDB_NAMESPACE::ReadTier::kBlockCacheTier;
134         state.it_cacheonly.reset(db_->NewIterator(options));
135       }
136     }
137 
138     const uint64_t upto = state.last_written.load();
139     for (ROCKSDB_NAMESPACE::Iterator* it :
140          {state.it_cacheonly.get(), state.it.get()}) {
141       if (it == nullptr) {
142         continue;
143       }
144       if (state.last_read.load() >= upto) {
145         break;
146       }
147       bool need_seek = true;
148       for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
149         if (need_seek) {
150           Key from(shard, state.last_read.load() + 1);
151           it->Seek(ROCKSDB_NAMESPACE::Slice((const char*)&from, sizeof(from)));
152           need_seek = false;
153         } else {
154           it->Next();
155         }
156         if (it->status().IsIncomplete()) {
157           ++::stats.cache_misses;
158           break;
159         }
160         assert(it->Valid());
161         assert(it->key().size() == sizeof(Key));
162         Key key;
163         memcpy(&key, it->key().data(), it->key().size());
164         // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
165         //         shard, seq, key.shard(), key.seqno());
166         assert(key.shard() == shard);
167         assert(key.seqno() == seq);
168         state.last_read.store(seq);
169         ++::stats.read;
170       }
171     }
172   }
173 
onWriteReader174   void onWrite(uint64_t shard) {
175     {
176       std::lock_guard<std::mutex> guard(queue_mutex_);
177       if (!shards_pending_set_.test(shard)) {
178         shards_pending_queue_.push(shard);
179         shards_pending_set_.set(shard);
180         sem_post(&sem_);
181       }
182     }
183   }
184 
~ReaderReader185   ~Reader() {
186     done_.store(true);
187     sem_post(&sem_);
188     thread_.join();
189   }
190 
191  private:
192   char pad1[128] __attribute__((__unused__));
193   std::vector<ShardState>* shard_states_;
194   ROCKSDB_NAMESPACE::DB* db_;
195   ROCKSDB_NAMESPACE::port::Thread thread_;
196   sem_t sem_;
197   std::mutex queue_mutex_;
198   std::bitset<MAX_SHARDS + 1> shards_pending_set_;
199   std::queue<uint64_t> shards_pending_queue_;
200   std::atomic<bool> done_{false};
201   char pad2[128] __attribute__((__unused__));
202 };
203 
204 struct Writer {
WriterWriter205   explicit Writer(std::vector<ShardState>* shard_states,
206                   ROCKSDB_NAMESPACE::DB* db)
207       : shard_states_(shard_states), db_(db) {}
208 
startWriter209   void start() { thread_ = port::Thread(&Writer::run, this); }
210 
runWriter211   void run() {
212     std::queue<std::chrono::steady_clock::time_point> workq;
213     std::chrono::steady_clock::time_point deadline(
214         std::chrono::steady_clock::now() +
215         std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
216     std::vector<uint64_t> my_shards;
217     for (int i = 1; i <= FLAGS_shards; ++i) {
218       if ((*shard_states_)[i].writer == this) {
219         my_shards.push_back(i);
220       }
221     }
222 
223     std::mt19937 rng{std::random_device()()};
224     std::uniform_int_distribution<int> shard_dist(
225         0, static_cast<int>(my_shards.size()) - 1);
226     std::string value(FLAGS_value_size, '*');
227 
228     while (1) {
229       auto now = std::chrono::steady_clock::now();
230       if (FLAGS_runtime >= 0 && now >= deadline) {
231         break;
232       }
233       if (workq.empty()) {
234         for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
235           std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
236           workq.push(now + offset);
237         }
238       }
239       while (!workq.empty() && workq.front() < now) {
240         workq.pop();
241         uint64_t shard = my_shards[shard_dist(rng)];
242         ShardState& state = (*shard_states_)[shard];
243         uint64_t seqno = state.last_written.load() + 1;
244         Key key(shard, seqno);
245         // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
246         ROCKSDB_NAMESPACE::Status status =
247             db_->Put(ROCKSDB_NAMESPACE::WriteOptions(),
248                      ROCKSDB_NAMESPACE::Slice((const char*)&key, sizeof(key)),
249                      ROCKSDB_NAMESPACE::Slice(value));
250         assert(status.ok());
251         state.last_written.store(seqno);
252         state.reader->onWrite(shard);
253         ++::stats.written;
254       }
255       std::this_thread::sleep_for(std::chrono::milliseconds(1));
256     }
257     // fprintf(stderr, "Writer done\n");
258   }
259 
~WriterWriter260   ~Writer() { thread_.join(); }
261 
262  private:
263   char pad1[128] __attribute__((__unused__));
264   std::vector<ShardState>* shard_states_;
265   ROCKSDB_NAMESPACE::DB* db_;
266   ROCKSDB_NAMESPACE::port::Thread thread_;
267   char pad2[128] __attribute__((__unused__));
268 };
269 
270 struct StatsThread {
StatsThreadStatsThread271   explicit StatsThread(ROCKSDB_NAMESPACE::DB* db)
272       : db_(db), thread_(&StatsThread::run, this) {}
273 
runStatsThread274   void run() {
275     //    using namespace std::chrono;
276     auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
277     uint64_t wlast = 0, rlast = 0;
278     while (!done_.load()) {
279       {
280         std::unique_lock<std::mutex> lock(cvm_);
281         cv_.wait_for(lock, std::chrono::seconds(1));
282       }
283       auto now = std::chrono::steady_clock::now();
284       double elapsed =
285           std::chrono::duration_cast<std::chrono::duration<double> >(
286               now - tlast).count();
287       uint64_t w = ::stats.written.load();
288       uint64_t r = ::stats.read.load();
289       fprintf(stderr,
290               "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
291               "r/s %10.0f | cache misses %10ld\n",
292               db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
293               std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
294                   .count(),
295               w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
296               ::stats.cache_misses.load());
297       wlast = w;
298       rlast = r;
299       tlast = now;
300     }
301   }
302 
~StatsThreadStatsThread303   ~StatsThread() {
304     {
305       std::lock_guard<std::mutex> guard(cvm_);
306       done_.store(true);
307     }
308     cv_.notify_all();
309     thread_.join();
310   }
311 
312  private:
313   ROCKSDB_NAMESPACE::DB* db_;
314   std::mutex cvm_;
315   std::condition_variable cv_;
316   ROCKSDB_NAMESPACE::port::Thread thread_;
317   std::atomic<bool> done_{false};
318 };
319 
main(int argc,char ** argv)320 int main(int argc, char** argv) {
321   GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
322 
323   std::mt19937 rng{std::random_device()()};
324   ROCKSDB_NAMESPACE::Status status;
325   std::string path =
326       ROCKSDB_NAMESPACE::test::PerThreadDBPath("forward_iterator_test");
327   fprintf(stderr, "db path is %s\n", path.c_str());
328   ROCKSDB_NAMESPACE::Options options;
329   options.create_if_missing = true;
330   options.compression = ROCKSDB_NAMESPACE::CompressionType::kNoCompression;
331   options.compaction_style =
332       ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleNone;
333   options.level0_slowdown_writes_trigger = 99999;
334   options.level0_stop_writes_trigger = 99999;
335   options.use_direct_io_for_flush_and_compaction = true;
336   options.write_buffer_size = FLAGS_memtable_size;
337   ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options;
338   table_options.block_cache =
339       ROCKSDB_NAMESPACE::NewLRUCache(FLAGS_block_cache_size);
340   table_options.block_size = FLAGS_block_size;
341   options.table_factory.reset(
342       ROCKSDB_NAMESPACE::NewBlockBasedTableFactory(table_options));
343 
344   status = ROCKSDB_NAMESPACE::DestroyDB(path, options);
345   assert(status.ok());
346   ROCKSDB_NAMESPACE::DB* db_raw;
347   status = ROCKSDB_NAMESPACE::DB::Open(options, path, &db_raw);
348   assert(status.ok());
349   std::unique_ptr<ROCKSDB_NAMESPACE::DB> db(db_raw);
350 
351   std::vector<ShardState> shard_states(FLAGS_shards + 1);
352   std::deque<Reader> readers;
353   while (static_cast<int>(readers.size()) < FLAGS_readers) {
354     readers.emplace_back(&shard_states, db_raw);
355   }
356   std::deque<Writer> writers;
357   while (static_cast<int>(writers.size()) < FLAGS_writers) {
358     writers.emplace_back(&shard_states, db_raw);
359   }
360 
361   // Each shard gets a random reader and random writer assigned to it
362   for (int i = 1; i <= FLAGS_shards; ++i) {
363     std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
364     std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
365     shard_states[i].reader = &readers[reader_dist(rng)];
366     shard_states[i].writer = &writers[writer_dist(rng)];
367   }
368 
369   StatsThread stats_thread(db_raw);
370   for (Writer& w : writers) {
371     w.start();
372   }
373 
374   writers.clear();
375   readers.clear();
376 }
377 #endif  // !defined(GFLAGS) || defined(ROCKSDB_LITE)
378