1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 /**
6  * Copyright (C) 2011 the original author or authors.
7  * See the notice.md file distributed with this work for additional
8  * information regarding copyright ownership.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 package org.rocksdb.benchmark;
23 
24 import java.io.IOException;
25 import java.lang.Runnable;
26 import java.lang.Math;
27 import java.io.File;
28 import java.lang.reflect.Constructor;
29 import java.lang.reflect.InvocationTargetException;
30 import java.nio.ByteBuffer;
31 import java.nio.file.Files;
32 import java.util.Collection;
33 import java.util.Date;
34 import java.util.EnumMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Random;
38 import java.util.concurrent.TimeUnit;
39 import java.util.Arrays;
40 import java.util.ArrayList;
41 import java.util.concurrent.Callable;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.TimeUnit;
46 import org.rocksdb.*;
47 import org.rocksdb.RocksMemEnv;
48 import org.rocksdb.util.SizeUnit;
49 
50 class Stats {
51   int id_;
52   long start_;
53   long finish_;
54   double seconds_;
55   long done_;
56   long found_;
57   long lastOpTime_;
58   long nextReport_;
59   long bytes_;
60   StringBuilder message_;
61   boolean excludeFromMerge_;
62 
63   // TODO(yhchiang): use the following arguments:
64   //   (Long)Flag.stats_interval
65   //   (Integer)Flag.stats_per_interval
66 
Stats(int id)67   Stats(int id) {
68     id_ = id;
69     nextReport_ = 100;
70     done_ = 0;
71     bytes_ = 0;
72     seconds_ = 0;
73     start_ = System.nanoTime();
74     lastOpTime_ = start_;
75     finish_ = start_;
76     found_ = 0;
77     message_ = new StringBuilder("");
78     excludeFromMerge_ = false;
79   }
80 
merge(final Stats other)81   void merge(final Stats other) {
82     if (other.excludeFromMerge_) {
83       return;
84     }
85 
86     done_ += other.done_;
87     found_ += other.found_;
88     bytes_ += other.bytes_;
89     seconds_ += other.seconds_;
90     if (other.start_ < start_) start_ = other.start_;
91     if (other.finish_ > finish_) finish_ = other.finish_;
92 
93     // Just keep the messages from one thread
94     if (message_.length() == 0) {
95       message_ = other.message_;
96     }
97   }
98 
stop()99   void stop() {
100     finish_ = System.nanoTime();
101     seconds_ = (double) (finish_ - start_) * 1e-9;
102   }
103 
addMessage(String msg)104   void addMessage(String msg) {
105     if (message_.length() > 0) {
106       message_.append(" ");
107     }
108     message_.append(msg);
109   }
110 
setId(int id)111   void setId(int id) { id_ = id; }
setExcludeFromMerge()112   void setExcludeFromMerge() { excludeFromMerge_ = true; }
113 
finishedSingleOp(int bytes)114   void finishedSingleOp(int bytes) {
115     done_++;
116     lastOpTime_ = System.nanoTime();
117     bytes_ += bytes;
118     if (done_ >= nextReport_) {
119       if (nextReport_ < 1000) {
120         nextReport_ += 100;
121       } else if (nextReport_ < 5000) {
122         nextReport_ += 500;
123       } else if (nextReport_ < 10000) {
124         nextReport_ += 1000;
125       } else if (nextReport_ < 50000) {
126         nextReport_ += 5000;
127       } else if (nextReport_ < 100000) {
128         nextReport_ += 10000;
129       } else if (nextReport_ < 500000) {
130         nextReport_ += 50000;
131       } else {
132         nextReport_ += 100000;
133       }
134       System.err.printf("... Task %s finished %d ops%30s\r", id_, done_, "");
135     }
136   }
137 
report(String name)138   void report(String name) {
139     // Pretend at least one op was done in case we are running a benchmark
140     // that does not call FinishedSingleOp().
141     if (done_ < 1) done_ = 1;
142 
143     StringBuilder extra = new StringBuilder("");
144     if (bytes_ > 0) {
145       // Rate is computed on actual elapsed time, not the sum of per-thread
146       // elapsed times.
147       double elapsed = (finish_ - start_) * 1e-9;
148       extra.append(String.format("%6.1f MB/s", (bytes_ / 1048576.0) / elapsed));
149     }
150     extra.append(message_.toString());
151     double elapsed = (finish_ - start_);
152     double throughput = (double) done_ / (elapsed * 1e-9);
153 
154     System.out.format("%-12s : %11.3f micros/op %d ops/sec;%s%s\n",
155             name, (elapsed * 1e-6) / done_,
156             (long) throughput, (extra.length() == 0 ? "" : " "), extra.toString());
157   }
158 }
159 
160 public class DbBenchmark {
161   enum Order {
162     SEQUENTIAL,
163     RANDOM
164   }
165 
166   enum DBState {
167     FRESH,
168     EXISTING
169   }
170 
171   static {
RocksDB.loadLibrary()172     RocksDB.loadLibrary();
173   }
174 
175   abstract class BenchmarkTask implements Callable<Stats> {
176     // TODO(yhchiang): use (Integer)Flag.perf_level.
BenchmarkTask( int tid, long randSeed, long numEntries, long keyRange)177     public BenchmarkTask(
178         int tid, long randSeed, long numEntries, long keyRange) {
179       tid_ = tid;
180       rand_ = new Random(randSeed + tid * 1000);
181       numEntries_ = numEntries;
182       keyRange_ = keyRange;
183       stats_ = new Stats(tid);
184     }
185 
call()186     @Override public Stats call() throws RocksDBException {
187       stats_.start_ = System.nanoTime();
188       runTask();
189       stats_.finish_ = System.nanoTime();
190       return stats_;
191     }
192 
runTask()193     abstract protected void runTask() throws RocksDBException;
194 
195     protected int tid_;
196     protected Random rand_;
197     protected long numEntries_;
198     protected long keyRange_;
199     protected Stats stats_;
200 
getFixedKey(byte[] key, long sn)201     protected void getFixedKey(byte[] key, long sn) {
202       generateKeyFromLong(key, sn);
203     }
204 
getRandomKey(byte[] key, long range)205     protected void getRandomKey(byte[] key, long range) {
206       generateKeyFromLong(key, Math.abs(rand_.nextLong() % range));
207     }
208   }
209 
210   abstract class WriteTask extends BenchmarkTask {
WriteTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)211     public WriteTask(
212         int tid, long randSeed, long numEntries, long keyRange,
213         WriteOptions writeOpt, long entriesPerBatch) {
214       super(tid, randSeed, numEntries, keyRange);
215       writeOpt_ = writeOpt;
216       entriesPerBatch_ = entriesPerBatch;
217       maxWritesPerSecond_ = -1;
218     }
219 
WriteTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)220     public WriteTask(
221         int tid, long randSeed, long numEntries, long keyRange,
222         WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond) {
223       super(tid, randSeed, numEntries, keyRange);
224       writeOpt_ = writeOpt;
225       entriesPerBatch_ = entriesPerBatch;
226       maxWritesPerSecond_ = maxWritesPerSecond;
227     }
228 
runTask()229     @Override public void runTask() throws RocksDBException {
230       if (numEntries_ != DbBenchmark.this.num_) {
231         stats_.message_.append(String.format(" (%d ops)", numEntries_));
232       }
233       byte[] key = new byte[keySize_];
234       byte[] value = new byte[valueSize_];
235 
236       try {
237         if (entriesPerBatch_ == 1) {
238           for (long i = 0; i < numEntries_; ++i) {
239             getKey(key, i, keyRange_);
240             DbBenchmark.this.gen_.generate(value);
241             db_.put(writeOpt_, key, value);
242             stats_.finishedSingleOp(keySize_ + valueSize_);
243             writeRateControl(i);
244             if (isFinished()) {
245               return;
246             }
247           }
248         } else {
249           for (long i = 0; i < numEntries_; i += entriesPerBatch_) {
250             WriteBatch batch = new WriteBatch();
251             for (long j = 0; j < entriesPerBatch_; j++) {
252               getKey(key, i + j, keyRange_);
253               DbBenchmark.this.gen_.generate(value);
254               batch.put(key, value);
255               stats_.finishedSingleOp(keySize_ + valueSize_);
256             }
257             db_.write(writeOpt_, batch);
258             batch.dispose();
259             writeRateControl(i);
260             if (isFinished()) {
261               return;
262             }
263           }
264         }
265       } catch (InterruptedException e) {
266         // thread has been terminated.
267       }
268     }
269 
writeRateControl(long writeCount)270     protected void writeRateControl(long writeCount)
271         throws InterruptedException {
272       if (maxWritesPerSecond_ <= 0) return;
273       long minInterval =
274           writeCount * TimeUnit.SECONDS.toNanos(1) / maxWritesPerSecond_;
275       long interval = System.nanoTime() - stats_.start_;
276       if (minInterval - interval > TimeUnit.MILLISECONDS.toNanos(1)) {
277         TimeUnit.NANOSECONDS.sleep(minInterval - interval);
278       }
279     }
280 
getKey(byte[] key, long id, long range)281     abstract protected void getKey(byte[] key, long id, long range);
282     protected WriteOptions writeOpt_;
283     protected long entriesPerBatch_;
284     protected long maxWritesPerSecond_;
285   }
286 
287   class WriteSequentialTask extends WriteTask {
WriteSequentialTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)288     public WriteSequentialTask(
289         int tid, long randSeed, long numEntries, long keyRange,
290         WriteOptions writeOpt, long entriesPerBatch) {
291       super(tid, randSeed, numEntries, keyRange,
292             writeOpt, entriesPerBatch);
293     }
WriteSequentialTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)294     public WriteSequentialTask(
295         int tid, long randSeed, long numEntries, long keyRange,
296         WriteOptions writeOpt, long entriesPerBatch,
297         long maxWritesPerSecond) {
298       super(tid, randSeed, numEntries, keyRange,
299             writeOpt, entriesPerBatch,
300             maxWritesPerSecond);
301     }
getKey(byte[] key, long id, long range)302     @Override protected void getKey(byte[] key, long id, long range) {
303       getFixedKey(key, id);
304     }
305   }
306 
307   class WriteRandomTask extends WriteTask {
WriteRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)308     public WriteRandomTask(
309         int tid, long randSeed, long numEntries, long keyRange,
310         WriteOptions writeOpt, long entriesPerBatch) {
311       super(tid, randSeed, numEntries, keyRange,
312             writeOpt, entriesPerBatch);
313     }
WriteRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)314     public WriteRandomTask(
315         int tid, long randSeed, long numEntries, long keyRange,
316         WriteOptions writeOpt, long entriesPerBatch,
317         long maxWritesPerSecond) {
318       super(tid, randSeed, numEntries, keyRange,
319             writeOpt, entriesPerBatch,
320             maxWritesPerSecond);
321     }
getKey(byte[] key, long id, long range)322     @Override protected void getKey(byte[] key, long id, long range) {
323       getRandomKey(key, range);
324     }
325   }
326 
327   class WriteUniqueRandomTask extends WriteTask {
328     static final int MAX_BUFFER_SIZE = 10000000;
WriteUniqueRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)329     public WriteUniqueRandomTask(
330         int tid, long randSeed, long numEntries, long keyRange,
331         WriteOptions writeOpt, long entriesPerBatch) {
332       super(tid, randSeed, numEntries, keyRange,
333             writeOpt, entriesPerBatch);
334       initRandomKeySequence();
335     }
WriteUniqueRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)336     public WriteUniqueRandomTask(
337         int tid, long randSeed, long numEntries, long keyRange,
338         WriteOptions writeOpt, long entriesPerBatch,
339         long maxWritesPerSecond) {
340       super(tid, randSeed, numEntries, keyRange,
341             writeOpt, entriesPerBatch,
342             maxWritesPerSecond);
343       initRandomKeySequence();
344     }
getKey(byte[] key, long id, long range)345     @Override protected void getKey(byte[] key, long id, long range) {
346       generateKeyFromLong(key, nextUniqueRandom());
347     }
348 
initRandomKeySequence()349     protected void initRandomKeySequence() {
350       bufferSize_ = MAX_BUFFER_SIZE;
351       if (bufferSize_ > keyRange_) {
352         bufferSize_ = (int) keyRange_;
353       }
354       currentKeyCount_ = bufferSize_;
355       keyBuffer_ = new long[MAX_BUFFER_SIZE];
356       for (int k = 0; k < bufferSize_; ++k) {
357         keyBuffer_[k] = k;
358       }
359     }
360 
361     /**
362      * Semi-randomly return the next unique key.  It is guaranteed to be
363      * fully random if keyRange_ <= MAX_BUFFER_SIZE.
364      */
nextUniqueRandom()365     long nextUniqueRandom() {
366       if (bufferSize_ == 0) {
367         System.err.println("bufferSize_ == 0.");
368         return 0;
369       }
370       int r = rand_.nextInt(bufferSize_);
371       // randomly pick one from the keyBuffer
372       long randKey = keyBuffer_[r];
373       if (currentKeyCount_ < keyRange_) {
374         // if we have not yet inserted all keys, insert next new key to [r].
375         keyBuffer_[r] = currentKeyCount_++;
376       } else {
377         // move the last element to [r] and decrease the size by 1.
378         keyBuffer_[r] = keyBuffer_[--bufferSize_];
379       }
380       return randKey;
381     }
382 
383     int bufferSize_;
384     long currentKeyCount_;
385     long[] keyBuffer_;
386   }
387 
388   class ReadRandomTask extends BenchmarkTask {
ReadRandomTask( int tid, long randSeed, long numEntries, long keyRange)389     public ReadRandomTask(
390         int tid, long randSeed, long numEntries, long keyRange) {
391       super(tid, randSeed, numEntries, keyRange);
392     }
runTask()393     @Override public void runTask() throws RocksDBException {
394       byte[] key = new byte[keySize_];
395       byte[] value = new byte[valueSize_];
396       for (long i = 0; i < numEntries_; i++) {
397         getRandomKey(key, keyRange_);
398         int len = db_.get(key, value);
399         if (len != RocksDB.NOT_FOUND) {
400           stats_.found_++;
401           stats_.finishedSingleOp(keySize_ + valueSize_);
402         } else {
403           stats_.finishedSingleOp(keySize_);
404         }
405         if (isFinished()) {
406           return;
407         }
408       }
409     }
410   }
411 
412   class ReadSequentialTask extends BenchmarkTask {
ReadSequentialTask( int tid, long randSeed, long numEntries, long keyRange)413     public ReadSequentialTask(
414         int tid, long randSeed, long numEntries, long keyRange) {
415       super(tid, randSeed, numEntries, keyRange);
416     }
runTask()417     @Override public void runTask() throws RocksDBException {
418       RocksIterator iter = db_.newIterator();
419       long i;
420       for (iter.seekToFirst(), i = 0;
421            iter.isValid() && i < numEntries_;
422            iter.next(), ++i) {
423         stats_.found_++;
424         stats_.finishedSingleOp(iter.key().length + iter.value().length);
425         if (isFinished()) {
426           iter.dispose();
427           return;
428         }
429       }
430       iter.dispose();
431     }
432   }
433 
DbBenchmark(Map<Flag, Object> flags)434   public DbBenchmark(Map<Flag, Object> flags) throws Exception {
435     benchmarks_ = (List<String>) flags.get(Flag.benchmarks);
436     num_ = (Integer) flags.get(Flag.num);
437     threadNum_ = (Integer) flags.get(Flag.threads);
438     reads_ = (Integer) (flags.get(Flag.reads) == null ?
439         flags.get(Flag.num) : flags.get(Flag.reads));
440     keySize_ = (Integer) flags.get(Flag.key_size);
441     valueSize_ = (Integer) flags.get(Flag.value_size);
442     compressionRatio_ = (Double) flags.get(Flag.compression_ratio);
443     useExisting_ = (Boolean) flags.get(Flag.use_existing_db);
444     randSeed_ = (Long) flags.get(Flag.seed);
445     databaseDir_ = (String) flags.get(Flag.db);
446     writesPerSeconds_ = (Integer) flags.get(Flag.writes_per_second);
447     memtable_ = (String) flags.get(Flag.memtablerep);
448     maxWriteBufferNumber_ = (Integer) flags.get(Flag.max_write_buffer_number);
449     prefixSize_ = (Integer) flags.get(Flag.prefix_size);
450     keysPerPrefix_ = (Integer) flags.get(Flag.keys_per_prefix);
451     hashBucketCount_ = (Long) flags.get(Flag.hash_bucket_count);
452     usePlainTable_ = (Boolean) flags.get(Flag.use_plain_table);
453     useMemenv_ = (Boolean) flags.get(Flag.use_mem_env);
454     flags_ = flags;
455     finishLock_ = new Object();
456     // options.setPrefixSize((Integer)flags_.get(Flag.prefix_size));
457     // options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix));
458     compressionType_ = (String) flags.get(Flag.compression_type);
459     compression_ = CompressionType.NO_COMPRESSION;
460     try {
461       if (compressionType_!=null) {
462           final CompressionType compressionType =
463               CompressionType.getCompressionType(compressionType_);
464           if (compressionType != null &&
465               compressionType != CompressionType.NO_COMPRESSION) {
466             System.loadLibrary(compressionType.getLibraryName());
467           }
468 
469       }
470     } catch (UnsatisfiedLinkError e) {
471       System.err.format("Unable to load %s library:%s%n" +
472                         "No compression is used.%n",
473           compressionType_, e.toString());
474       compressionType_ = "none";
475     }
476     gen_ = new RandomGenerator(randSeed_, compressionRatio_);
477   }
478 
prepareReadOptions(ReadOptions options)479   private void prepareReadOptions(ReadOptions options) {
480     options.setVerifyChecksums((Boolean)flags_.get(Flag.verify_checksum));
481     options.setTailing((Boolean)flags_.get(Flag.use_tailing_iterator));
482   }
483 
prepareWriteOptions(WriteOptions options)484   private void prepareWriteOptions(WriteOptions options) {
485     options.setSync((Boolean)flags_.get(Flag.sync));
486     options.setDisableWAL((Boolean)flags_.get(Flag.disable_wal));
487   }
488 
prepareOptions(Options options)489   private void prepareOptions(Options options) throws RocksDBException {
490     if (!useExisting_) {
491       options.setCreateIfMissing(true);
492     } else {
493       options.setCreateIfMissing(false);
494     }
495     if (useMemenv_) {
496       options.setEnv(new RocksMemEnv(Env.getDefault()));
497     }
498     switch (memtable_) {
499       case "skip_list":
500         options.setMemTableConfig(new SkipListMemTableConfig());
501         break;
502       case "vector":
503         options.setMemTableConfig(new VectorMemTableConfig());
504         break;
505       case "hash_linkedlist":
506         options.setMemTableConfig(
507             new HashLinkedListMemTableConfig()
508                 .setBucketCount(hashBucketCount_));
509         options.useFixedLengthPrefixExtractor(prefixSize_);
510         break;
511       case "hash_skiplist":
512       case "prefix_hash":
513         options.setMemTableConfig(
514             new HashSkipListMemTableConfig()
515                 .setBucketCount(hashBucketCount_));
516         options.useFixedLengthPrefixExtractor(prefixSize_);
517         break;
518       default:
519         System.err.format(
520             "unable to detect the specified memtable, " +
521                 "use the default memtable factory %s%n",
522             options.memTableFactoryName());
523         break;
524     }
525     if (usePlainTable_) {
526       options.setTableFormatConfig(
527           new PlainTableConfig().setKeySize(keySize_));
528     } else {
529       BlockBasedTableConfig table_options = new BlockBasedTableConfig();
530       table_options.setBlockSize((Long)flags_.get(Flag.block_size))
531                    .setBlockCacheSize((Long)flags_.get(Flag.cache_size))
532                    .setCacheNumShardBits(
533                       (Integer)flags_.get(Flag.cache_numshardbits));
534       options.setTableFormatConfig(table_options);
535     }
536     options.setWriteBufferSize(
537         (Long)flags_.get(Flag.write_buffer_size));
538     options.setMaxWriteBufferNumber(
539         (Integer)flags_.get(Flag.max_write_buffer_number));
540     options.setMaxBackgroundCompactions(
541         (Integer)flags_.get(Flag.max_background_compactions));
542     options.getEnv().setBackgroundThreads(
543         (Integer)flags_.get(Flag.max_background_compactions));
544     options.setMaxBackgroundFlushes(
545         (Integer)flags_.get(Flag.max_background_flushes));
546     options.setMaxBackgroundJobs((Integer) flags_.get(Flag.max_background_jobs));
547     options.setMaxOpenFiles(
548         (Integer)flags_.get(Flag.open_files));
549     options.setUseFsync(
550         (Boolean)flags_.get(Flag.use_fsync));
551     options.setWalDir(
552         (String)flags_.get(Flag.wal_dir));
553     options.setDeleteObsoleteFilesPeriodMicros(
554         (Integer)flags_.get(Flag.delete_obsolete_files_period_micros));
555     options.setTableCacheNumshardbits(
556         (Integer)flags_.get(Flag.table_cache_numshardbits));
557     options.setAllowMmapReads(
558         (Boolean)flags_.get(Flag.mmap_read));
559     options.setAllowMmapWrites(
560         (Boolean)flags_.get(Flag.mmap_write));
561     options.setAdviseRandomOnOpen(
562         (Boolean)flags_.get(Flag.advise_random_on_open));
563     options.setUseAdaptiveMutex(
564         (Boolean)flags_.get(Flag.use_adaptive_mutex));
565     options.setBytesPerSync(
566         (Long)flags_.get(Flag.bytes_per_sync));
567     options.setBloomLocality(
568         (Integer)flags_.get(Flag.bloom_locality));
569     options.setMinWriteBufferNumberToMerge(
570         (Integer)flags_.get(Flag.min_write_buffer_number_to_merge));
571     options.setMemtablePrefixBloomSizeRatio((Double) flags_.get(Flag.memtable_bloom_size_ratio));
572     options.setNumLevels(
573         (Integer)flags_.get(Flag.num_levels));
574     options.setTargetFileSizeBase(
575         (Integer)flags_.get(Flag.target_file_size_base));
576     options.setTargetFileSizeMultiplier((Integer)flags_.get(Flag.target_file_size_multiplier));
577     options.setMaxBytesForLevelBase(
578         (Integer)flags_.get(Flag.max_bytes_for_level_base));
579     options.setMaxBytesForLevelMultiplier((Double) flags_.get(Flag.max_bytes_for_level_multiplier));
580     options.setLevelZeroStopWritesTrigger(
581         (Integer)flags_.get(Flag.level0_stop_writes_trigger));
582     options.setLevelZeroSlowdownWritesTrigger(
583         (Integer)flags_.get(Flag.level0_slowdown_writes_trigger));
584     options.setLevelZeroFileNumCompactionTrigger(
585         (Integer)flags_.get(Flag.level0_file_num_compaction_trigger));
586     options.setMaxCompactionBytes(
587         (Long) flags_.get(Flag.max_compaction_bytes));
588     options.setDisableAutoCompactions(
589         (Boolean)flags_.get(Flag.disable_auto_compactions));
590     options.setMaxSuccessiveMerges(
591         (Integer)flags_.get(Flag.max_successive_merges));
592     options.setWalTtlSeconds((Long)flags_.get(Flag.wal_ttl_seconds));
593     options.setWalSizeLimitMB((Long)flags_.get(Flag.wal_size_limit_MB));
594     if(flags_.get(Flag.java_comparator) != null) {
595       options.setComparator(
596           (AbstractComparator)flags_.get(Flag.java_comparator));
597     }
598 
599     /* TODO(yhchiang): enable the following parameters
600     options.setCompressionType((String)flags_.get(Flag.compression_type));
601     options.setCompressionLevel((Integer)flags_.get(Flag.compression_level));
602     options.setMinLevelToCompress((Integer)flags_.get(Flag.min_level_to_compress));
603     options.setHdfs((String)flags_.get(Flag.hdfs)); // env
604     options.setStatistics((Boolean)flags_.get(Flag.statistics));
605     options.setUniversalSizeRatio(
606         (Integer)flags_.get(Flag.universal_size_ratio));
607     options.setUniversalMinMergeWidth(
608         (Integer)flags_.get(Flag.universal_min_merge_width));
609     options.setUniversalMaxMergeWidth(
610         (Integer)flags_.get(Flag.universal_max_merge_width));
611     options.setUniversalMaxSizeAmplificationPercent(
612         (Integer)flags_.get(Flag.universal_max_size_amplification_percent));
613     options.setUniversalCompressionSizePercent(
614         (Integer)flags_.get(Flag.universal_compression_size_percent));
615     // TODO(yhchiang): add RocksDB.openForReadOnly() to enable Flag.readonly
616     // TODO(yhchiang): enable Flag.merge_operator by switch
617     options.setAccessHintOnCompactionStart(
618         (String)flags_.get(Flag.compaction_fadvice));
619     // available values of fadvice are "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" for fadvice
620     */
621   }
622 
run()623   private void run() throws RocksDBException {
624     if (!useExisting_) {
625       destroyDb();
626     }
627     Options options = new Options();
628     prepareOptions(options);
629     open(options);
630 
631     printHeader(options);
632 
633     for (String benchmark : benchmarks_) {
634       List<Callable<Stats>> tasks = new ArrayList<Callable<Stats>>();
635       List<Callable<Stats>> bgTasks = new ArrayList<Callable<Stats>>();
636       WriteOptions writeOpt = new WriteOptions();
637       prepareWriteOptions(writeOpt);
638       ReadOptions readOpt = new ReadOptions();
639       prepareReadOptions(readOpt);
640       int currentTaskId = 0;
641       boolean known = true;
642 
643       switch (benchmark) {
644         case "fillseq":
645           tasks.add(new WriteSequentialTask(
646               currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
647           break;
648         case "fillbatch":
649           tasks.add(
650               new WriteSequentialTask(currentTaskId++, randSeed_, num_, num_, writeOpt, 1000));
651           break;
652         case "fillrandom":
653           tasks.add(new WriteRandomTask(
654               currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
655           break;
656         case "filluniquerandom":
657           tasks.add(new WriteUniqueRandomTask(
658               currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
659           break;
660         case "fillsync":
661           writeOpt.setSync(true);
662           tasks.add(new WriteRandomTask(
663               currentTaskId++, randSeed_, num_ / 1000, num_ / 1000,
664               writeOpt, 1));
665           break;
666         case "readseq":
667           for (int t = 0; t < threadNum_; ++t) {
668             tasks.add(new ReadSequentialTask(
669                 currentTaskId++, randSeed_, reads_ / threadNum_, num_));
670           }
671           break;
672         case "readrandom":
673           for (int t = 0; t < threadNum_; ++t) {
674             tasks.add(new ReadRandomTask(
675                 currentTaskId++, randSeed_, reads_ / threadNum_, num_));
676           }
677           break;
678         case "readwhilewriting":
679           WriteTask writeTask = new WriteRandomTask(
680               -1, randSeed_, Long.MAX_VALUE, num_, writeOpt, 1, writesPerSeconds_);
681           writeTask.stats_.setExcludeFromMerge();
682           bgTasks.add(writeTask);
683           for (int t = 0; t < threadNum_; ++t) {
684             tasks.add(new ReadRandomTask(
685                 currentTaskId++, randSeed_, reads_ / threadNum_, num_));
686           }
687           break;
688         case "readhot":
689           for (int t = 0; t < threadNum_; ++t) {
690             tasks.add(new ReadRandomTask(
691                 currentTaskId++, randSeed_, reads_ / threadNum_, num_ / 100));
692           }
693           break;
694         case "delete":
695           destroyDb();
696           open(options);
697           break;
698         default:
699           known = false;
700           System.err.println("Unknown benchmark: " + benchmark);
701           break;
702       }
703       if (known) {
704         ExecutorService executor = Executors.newCachedThreadPool();
705         ExecutorService bgExecutor = Executors.newCachedThreadPool();
706         try {
707           // measure only the main executor time
708           List<Future<Stats>> bgResults = new ArrayList<Future<Stats>>();
709           for (Callable bgTask : bgTasks) {
710             bgResults.add(bgExecutor.submit(bgTask));
711           }
712           start();
713           List<Future<Stats>> results = executor.invokeAll(tasks);
714           executor.shutdown();
715           boolean finished = executor.awaitTermination(10, TimeUnit.SECONDS);
716           if (!finished) {
717             System.out.format(
718                 "Benchmark %s was not finished before timeout.",
719                 benchmark);
720             executor.shutdownNow();
721           }
722           setFinished(true);
723           bgExecutor.shutdown();
724           finished = bgExecutor.awaitTermination(10, TimeUnit.SECONDS);
725           if (!finished) {
726             System.out.format(
727                 "Benchmark %s was not finished before timeout.",
728                 benchmark);
729             bgExecutor.shutdownNow();
730           }
731 
732           stop(benchmark, results, currentTaskId);
733         } catch (InterruptedException e) {
734           System.err.println(e);
735         }
736       }
737       writeOpt.dispose();
738       readOpt.dispose();
739     }
740     options.dispose();
741     db_.close();
742   }
743 
printHeader(Options options)744   private void printHeader(Options options) {
745     int kKeySize = 16;
746     System.out.printf("Keys:     %d bytes each\n", kKeySize);
747     System.out.printf("Values:   %d bytes each (%d bytes after compression)\n",
748         valueSize_,
749         (int) (valueSize_ * compressionRatio_ + 0.5));
750     System.out.printf("Entries:  %d\n", num_);
751     System.out.printf("RawSize:  %.1f MB (estimated)\n",
752         ((double)(kKeySize + valueSize_) * num_) / SizeUnit.MB);
753     System.out.printf("FileSize:   %.1f MB (estimated)\n",
754         (((kKeySize + valueSize_ * compressionRatio_) * num_) / SizeUnit.MB));
755     System.out.format("Memtable Factory: %s%n", options.memTableFactoryName());
756     System.out.format("Prefix:   %d bytes%n", prefixSize_);
757     System.out.format("Compression: %s%n", compressionType_);
758     printWarnings();
759     System.out.printf("------------------------------------------------\n");
760   }
761 
printWarnings()762   void printWarnings() {
763     boolean assertsEnabled = false;
764     assert assertsEnabled = true; // Intentional side effect!!!
765     if (assertsEnabled) {
766       System.out.printf(
767           "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
768     }
769   }
770 
open(Options options)771   private void open(Options options) throws RocksDBException {
772     System.out.println("Using database directory: " + databaseDir_);
773     db_ = RocksDB.open(options, databaseDir_);
774   }
775 
start()776   private void start() {
777     setFinished(false);
778     startTime_ = System.nanoTime();
779   }
780 
stop( String benchmark, List<Future<Stats>> results, int concurrentThreads)781   private void stop(
782       String benchmark, List<Future<Stats>> results, int concurrentThreads) {
783     long endTime = System.nanoTime();
784     double elapsedSeconds =
785         1.0d * (endTime - startTime_) / TimeUnit.SECONDS.toNanos(1);
786 
787     Stats stats = new Stats(-1);
788     int taskFinishedCount = 0;
789     for (Future<Stats> result : results) {
790       if (result.isDone()) {
791         try {
792           Stats taskStats = result.get(3, TimeUnit.SECONDS);
793           if (!result.isCancelled()) {
794             taskFinishedCount++;
795           }
796           stats.merge(taskStats);
797         } catch (Exception e) {
798           // then it's not successful, the output will indicate this
799         }
800       }
801     }
802     String extra = "";
803     if (benchmark.indexOf("read") >= 0) {
804       extra = String.format(" %d / %d found; ", stats.found_, stats.done_);
805     } else {
806       extra = String.format(" %d ops done; ", stats.done_);
807     }
808 
809     System.out.printf(
810         "%-16s : %11.5f micros/op; %6.1f MB/s;%s %d / %d task(s) finished.\n",
811         benchmark, elapsedSeconds / stats.done_ * 1e6,
812         (stats.bytes_ / 1048576.0) / elapsedSeconds, extra,
813         taskFinishedCount, concurrentThreads);
814   }
815 
generateKeyFromLong(byte[] slice, long n)816   public void generateKeyFromLong(byte[] slice, long n) {
817     assert(n >= 0);
818     int startPos = 0;
819 
820     if (keysPerPrefix_ > 0) {
821       long numPrefix = (num_ + keysPerPrefix_ - 1) / keysPerPrefix_;
822       long prefix = n % numPrefix;
823       int bytesToFill = Math.min(prefixSize_, 8);
824       for (int i = 0; i < bytesToFill; ++i) {
825         slice[i] = (byte) (prefix % 256);
826         prefix /= 256;
827       }
828       for (int i = 8; i < bytesToFill; ++i) {
829         slice[i] = '0';
830       }
831       startPos = bytesToFill;
832     }
833 
834     for (int i = slice.length - 1; i >= startPos; --i) {
835       slice[i] = (byte) ('0' + (n % 10));
836       n /= 10;
837     }
838   }
839 
destroyDb()840   private void destroyDb() {
841     if (db_ != null) {
842       db_.close();
843     }
844     // TODO(yhchiang): develop our own FileUtil
845     // FileUtil.deleteDir(databaseDir_);
846   }
847 
printStats()848   private void printStats() {
849   }
850 
printHelp()851   static void printHelp() {
852     System.out.println("usage:");
853     for (Flag flag : Flag.values()) {
854       System.out.format("  --%s%n\t%s%n",
855           flag.name(),
856           flag.desc());
857       if (flag.getDefaultValue() != null) {
858         System.out.format("\tDEFAULT: %s%n",
859             flag.getDefaultValue().toString());
860       }
861     }
862   }
863 
main(String[] args)864   public static void main(String[] args) throws Exception {
865     Map<Flag, Object> flags = new EnumMap<Flag, Object>(Flag.class);
866     for (Flag flag : Flag.values()) {
867       if (flag.getDefaultValue() != null) {
868         flags.put(flag, flag.getDefaultValue());
869       }
870     }
871     for (String arg : args) {
872       boolean valid = false;
873       if (arg.equals("--help") || arg.equals("-h")) {
874         printHelp();
875         System.exit(0);
876       }
877       if (arg.startsWith("--")) {
878         try {
879           String[] parts = arg.substring(2).split("=");
880           if (parts.length >= 1) {
881             Flag key = Flag.valueOf(parts[0]);
882             if (key != null) {
883               Object value = null;
884               if (parts.length >= 2) {
885                 value = key.parseValue(parts[1]);
886               }
887               flags.put(key, value);
888               valid = true;
889             }
890           }
891         }
892         catch (Exception e) {
893         }
894       }
895       if (!valid) {
896         System.err.println("Invalid argument " + arg);
897         System.exit(1);
898       }
899     }
900     new DbBenchmark(flags).run();
901   }
902 
903   private enum Flag {
904     benchmarks(Arrays.asList("fillseq", "readrandom", "fillrandom"),
905         "Comma-separated list of operations to run in the specified order\n"
906             + "\tActual benchmarks:\n"
907             + "\t\tfillseq          -- write N values in sequential key order in async mode.\n"
908             + "\t\tfillrandom       -- write N values in random key order in async mode.\n"
909             + "\t\tfillbatch        -- write N/1000 batch where each batch has 1000 values\n"
910             + "\t\t                   in sequential key order in sync mode.\n"
911             + "\t\tfillsync         -- write N/100 values in random key order in sync mode.\n"
912             + "\t\tfill100K         -- write N/1000 100K values in random order in async mode.\n"
913             + "\t\treadseq          -- read N times sequentially.\n"
914             + "\t\treadrandom       -- read N times in random order.\n"
915             + "\t\treadhot          -- read N times in random order from 1% section of DB.\n"
916             + "\t\treadwhilewriting -- measure the read performance of multiple readers\n"
917             + "\t\t                   with a bg single writer.  The write rate of the bg\n"
918             + "\t\t                   is capped by --writes_per_second.\n"
919             + "\tMeta Operations:\n"
920             + "\t\tdelete            -- delete DB") {
parseValue(String value)921       @Override public Object parseValue(String value) {
922         return new ArrayList<String>(Arrays.asList(value.split(",")));
923       }
924     },
925     compression_ratio(0.5d,
926         "Arrange to generate values that shrink to this fraction of\n" +
927         "\ttheir original size after compression.") {
parseValue(String value)928       @Override public Object parseValue(String value) {
929         return Double.parseDouble(value);
930       }
931     },
use_existing_db(false, R + R + R)932     use_existing_db(false,
933         "If true, do not destroy the existing database.  If you set this\n" +
934         "\tflag and also specify a benchmark that wants a fresh database,\n" +
935         "\tthat benchmark will fail.") {
936       @Override public Object parseValue(String value) {
937         return parseBoolean(value);
938       }
939     },
940     num(1000000,
941         "Number of key/values to place in database.") {
parseValue(String value)942       @Override public Object parseValue(String value) {
943         return Integer.parseInt(value);
944       }
945     },
946     threads(1,
947         "Number of concurrent threads to run.") {
parseValue(String value)948       @Override public Object parseValue(String value) {
949         return Integer.parseInt(value);
950       }
951     },
reads(null, R)952     reads(null,
953         "Number of read operations to do.  If negative, do --nums reads.") {
954       @Override public Object parseValue(String value) {
955         return Integer.parseInt(value);
956       }
957     },
958     key_size(16,
959         "The size of each key in bytes.") {
parseValue(String value)960       @Override public Object parseValue(String value) {
961         return Integer.parseInt(value);
962       }
963     },
964     value_size(100,
965         "The size of each value in bytes.") {
parseValue(String value)966       @Override public Object parseValue(String value) {
967         return Integer.parseInt(value);
968       }
969     },
970     write_buffer_size(4L * SizeUnit.MB,
971         "Number of bytes to buffer in memtable before compacting\n" +
972         "\t(initialized to default value by 'main'.)") {
parseValue(String value)973       @Override public Object parseValue(String value) {
974         return Long.parseLong(value);
975       }
976     },
977     max_write_buffer_number(2,
978              "The number of in-memory memtables. Each memtable is of size\n" +
979              "\twrite_buffer_size.") {
parseValue(String value)980       @Override public Object parseValue(String value) {
981         return Integer.parseInt(value);
982       }
983     },
984     prefix_size(0, "Controls the prefix size for HashSkipList, HashLinkedList,\n" +
985                    "\tand plain table.") {
parseValue(String value)986       @Override public Object parseValue(String value) {
987         return Integer.parseInt(value);
988       }
989     },
990     keys_per_prefix(0, "Controls the average number of keys generated\n" +
991              "\tper prefix, 0 means no special handling of the prefix,\n" +
992              "\ti.e. use the prefix comes with the generated random number.") {
parseValue(String value)993       @Override public Object parseValue(String value) {
994         return Integer.parseInt(value);
995       }
996     },
997     memtablerep("skip_list",
998         "The memtable format.  Available options are\n" +
999         "\tskip_list,\n" +
1000         "\tvector,\n" +
1001         "\thash_linkedlist,\n" +
1002         "\thash_skiplist (prefix_hash.)") {
parseValue(String value)1003       @Override public Object parseValue(String value) {
1004         return value;
1005       }
1006     },
hash_bucket_count(SizeUnit.MB, R + R + R)1007     hash_bucket_count(SizeUnit.MB,
1008         "The number of hash buckets used in the hash-bucket-based\n" +
1009         "\tmemtables.  Memtables that currently support this argument are\n" +
1010         "\thash_linkedlist and hash_skiplist.") {
1011       @Override public Object parseValue(String value) {
1012         return Long.parseLong(value);
1013       }
1014     },
1015     writes_per_second(10000,
1016         "The write-rate of the background writer used in the\n" +
1017         "\t`readwhilewriting` benchmark.  Non-positive number indicates\n" +
1018         "\tusing an unbounded write-rate in `readwhilewriting` benchmark.") {
parseValue(String value)1019       @Override public Object parseValue(String value) {
1020         return Integer.parseInt(value);
1021       }
1022     },
use_plain_table(false, R)1023     use_plain_table(false,
1024         "Use plain-table sst format.") {
1025       @Override public Object parseValue(String value) {
1026         return parseBoolean(value);
1027       }
1028     },
1029     cache_size(-1L,
1030         "Number of bytes to use as a cache of uncompressed data.\n" +
1031         "\tNegative means use default settings.") {
parseValue(String value)1032       @Override public Object parseValue(String value) {
1033         return Long.parseLong(value);
1034       }
1035     },
1036     seed(0L,
1037         "Seed base for random number generators.") {
parseValue(String value)1038       @Override public Object parseValue(String value) {
1039         return Long.parseLong(value);
1040       }
1041     },
1042     num_levels(7,
1043         "The total number of levels.") {
parseValue(String value)1044       @Override public Object parseValue(String value) {
1045         return Integer.parseInt(value);
1046       }
1047     },
1048     numdistinct(1000L,
1049         "Number of distinct keys to use. Used in RandomWithVerify to\n" +
1050         "\tread/write on fewer keys so that gets are more likely to find the\n" +
1051         "\tkey and puts are more likely to update the same key.") {
parseValue(String value)1052       @Override public Object parseValue(String value) {
1053         return Long.parseLong(value);
1054       }
1055     },
1056     merge_keys(-1L,
1057         "Number of distinct keys to use for MergeRandom and\n" +
1058         "\tReadRandomMergeRandom.\n" +
1059         "\tIf negative, there will be FLAGS_num keys.") {
parseValue(String value)1060       @Override public Object parseValue(String value) {
1061         return Long.parseLong(value);
1062       }
1063     },
1064     bloom_locality(0,"Control bloom filter probes locality.") {
parseValue(String value)1065       @Override public Object parseValue(String value) {
1066         return Integer.parseInt(value);
1067       }
1068     },
1069     duration(0,"Time in seconds for the random-ops tests to run.\n" +
1070         "\tWhen 0 then num & reads determine the test duration.") {
parseValue(String value)1071       @Override public Object parseValue(String value) {
1072         return Integer.parseInt(value);
1073       }
1074     },
1075     num_multi_db(0,
1076         "Number of DBs used in the benchmark. 0 means single DB.") {
parseValue(String value)1077       @Override public Object parseValue(String value) {
1078         return Integer.parseInt(value);
1079       }
1080     },
histogram(false,R)1081     histogram(false,"Print histogram of operation timings.") {
1082       @Override public Object parseValue(String value) {
1083         return parseBoolean(value);
1084       }
1085     },
1086     min_write_buffer_number_to_merge(
1087         defaultOptions_.minWriteBufferNumberToMerge(),
1088         "The minimum number of write buffers that will be merged together\n" +
1089         "\tbefore writing to storage. This is cheap because it is an\n" +
1090         "\tin-memory merge. If this feature is not enabled, then all these\n" +
1091         "\twrite buffers are flushed to L0 as separate files and this\n" +
1092         "\tincreases read amplification because a get request has to check\n" +
1093         "\tin all of these files. Also, an in-memory merge may result in\n" +
1094         "\twriting less data to storage if there are duplicate records\n" +
1095         "\tin each of these individual write buffers.") {
parseValue(String value)1096       @Override public Object parseValue(String value) {
1097         return Integer.parseInt(value);
1098       }
1099     },
1100     max_background_compactions(
1101         defaultOptions_.maxBackgroundCompactions(),
1102         "The maximum number of concurrent background compactions\n" +
1103         "\tthat can occur in parallel.") {
parseValue(String value)1104       @Override public Object parseValue(String value) {
1105         return Integer.parseInt(value);
1106       }
1107     },
1108     max_background_flushes(
1109         defaultOptions_.maxBackgroundFlushes(),
1110         "The maximum number of concurrent background flushes\n" +
1111         "\tthat can occur in parallel.") {
parseValue(String value)1112       @Override public Object parseValue(String value) {
1113         return Integer.parseInt(value);
1114       }
1115     },
1116     max_background_jobs(defaultOptions_.maxBackgroundJobs(),
1117         "The maximum number of concurrent background jobs\n"
1118             + "\tthat can occur in parallel.") {
1119       @Override
parseValue(String value)1120       public Object parseValue(String value) {
1121         return Integer.parseInt(value);
1122       }
1123     },
1124     /* TODO(yhchiang): enable the following
1125     compaction_style((int32_t) defaultOptions_.compactionStyle(),
1126         "style of compaction: level-based vs universal.") {
1127       @Override public Object parseValue(String value) {
1128         return Integer.parseInt(value);
1129       }
1130     },*/
1131     universal_size_ratio(0,
1132         "Percentage flexibility while comparing file size\n" +
1133         "\t(for universal compaction only).") {
parseValue(String value)1134       @Override public Object parseValue(String value) {
1135         return Integer.parseInt(value);
1136       }
1137     },
1138     universal_min_merge_width(0,"The minimum number of files in a\n" +
1139         "\tsingle compaction run (for universal compaction only).") {
parseValue(String value)1140       @Override public Object parseValue(String value) {
1141         return Integer.parseInt(value);
1142       }
1143     },
1144     universal_max_merge_width(0,"The max number of files to compact\n" +
1145         "\tin universal style compaction.") {
parseValue(String value)1146       @Override public Object parseValue(String value) {
1147         return Integer.parseInt(value);
1148       }
1149     },
1150     universal_max_size_amplification_percent(0,
1151         "The max size amplification for universal style compaction.") {
parseValue(String value)1152       @Override public Object parseValue(String value) {
1153         return Integer.parseInt(value);
1154       }
1155     },
1156     universal_compression_size_percent(-1,
1157         "The percentage of the database to compress for universal\n" +
1158         "\tcompaction. -1 means compress everything.") {
parseValue(String value)1159       @Override public Object parseValue(String value) {
1160         return Integer.parseInt(value);
1161       }
1162     },
1163     block_size(defaultBlockBasedTableOptions_.blockSize(),
1164         "Number of bytes in a block.") {
parseValue(String value)1165       @Override public Object parseValue(String value) {
1166         return Long.parseLong(value);
1167       }
1168     },
1169     compressed_cache_size(-1L,
1170         "Number of bytes to use as a cache of compressed data.") {
parseValue(String value)1171       @Override public Object parseValue(String value) {
1172         return Long.parseLong(value);
1173       }
1174     },
1175     open_files(defaultOptions_.maxOpenFiles(),
1176         "Maximum number of files to keep open at the same time\n" +
1177         "\t(use default if == 0)") {
parseValue(String value)1178       @Override public Object parseValue(String value) {
1179         return Integer.parseInt(value);
1180       }
1181     },
1182     bloom_bits(-1,"Bloom filter bits per key. Negative means\n" +
1183         "\tuse default settings.") {
parseValue(String value)1184       @Override public Object parseValue(String value) {
1185         return Integer.parseInt(value);
1186       }
1187     },
1188     memtable_bloom_size_ratio(0.0d, "Ratio of memtable used by the bloom filter.\n"
1189             + "\t0 means no bloom filter.") {
parseValue(String value)1190       @Override public Object parseValue(String value) {
1191         return Double.parseDouble(value);
1192       }
1193     },
1194     cache_numshardbits(-1,"Number of shards for the block cache\n" +
1195         "\tis 2 ** cache_numshardbits. Negative means use default settings.\n" +
1196         "\tThis is applied only if FLAGS_cache_size is non-negative.") {
parseValue(String value)1197       @Override public Object parseValue(String value) {
1198         return Integer.parseInt(value);
1199       }
1200     },
verify_checksum(false,R + R)1201     verify_checksum(false,"Verify checksum for every block read\n" +
1202         "\tfrom storage.") {
1203       @Override public Object parseValue(String value) {
1204         return parseBoolean(value);
1205       }
1206     },
statistics(false,R)1207     statistics(false,"Database statistics.") {
1208       @Override public Object parseValue(String value) {
1209         return parseBoolean(value);
1210       }
1211     },
1212     writes(-1L, "Number of write operations to do. If negative, do\n" +
1213         "\t--num reads.") {
parseValue(String value)1214       @Override public Object parseValue(String value) {
1215         return Long.parseLong(value);
1216       }
1217     },
sync(false,R)1218     sync(false,"Sync all writes to disk.") {
1219       @Override public Object parseValue(String value) {
1220         return parseBoolean(value);
1221       }
1222     },
use_fsync(false,R)1223     use_fsync(false,"If true, issue fsync instead of fdatasync.") {
1224       @Override public Object parseValue(String value) {
1225         return parseBoolean(value);
1226       }
1227     },
disable_wal(false,R)1228     disable_wal(false,"If true, do not write WAL for write.") {
1229       @Override public Object parseValue(String value) {
1230         return parseBoolean(value);
1231       }
1232     },
1233     wal_dir("", "If not empty, use the given dir for WAL.") {
parseValue(String value)1234       @Override public Object parseValue(String value) {
1235         return value;
1236       }
1237     },
1238     target_file_size_base(2 * 1048576,"Target file size at level-1") {
parseValue(String value)1239       @Override public Object parseValue(String value) {
1240         return Integer.parseInt(value);
1241       }
1242     },
1243     target_file_size_multiplier(1,
1244         "A multiplier to compute target level-N file size (N >= 2)") {
parseValue(String value)1245       @Override public Object parseValue(String value) {
1246         return Integer.parseInt(value);
1247       }
1248     },
1249     max_bytes_for_level_base(10 * 1048576,
1250       "Max bytes for level-1") {
parseValue(String value)1251       @Override public Object parseValue(String value) {
1252         return Integer.parseInt(value);
1253       }
1254     },
1255     max_bytes_for_level_multiplier(10.0d,
1256         "A multiplier to compute max bytes for level-N (N >= 2)") {
parseValue(String value)1257       @Override public Object parseValue(String value) {
1258         return Double.parseDouble(value);
1259       }
1260     },
1261     level0_stop_writes_trigger(12,"Number of files in level-0\n" +
1262         "\tthat will trigger put stop.") {
parseValue(String value)1263       @Override public Object parseValue(String value) {
1264         return Integer.parseInt(value);
1265       }
1266     },
1267     level0_slowdown_writes_trigger(8,"Number of files in level-0\n" +
1268         "\tthat will slow down writes.") {
parseValue(String value)1269       @Override public Object parseValue(String value) {
1270         return Integer.parseInt(value);
1271       }
1272     },
1273     level0_file_num_compaction_trigger(4,"Number of files in level-0\n" +
1274         "\twhen compactions start.") {
parseValue(String value)1275       @Override public Object parseValue(String value) {
1276         return Integer.parseInt(value);
1277       }
1278     },
1279     readwritepercent(90,"Ratio of reads to reads/writes (expressed\n" +
1280         "\tas percentage) for the ReadRandomWriteRandom workload. The\n" +
1281         "\tdefault value 90 means 90% operations out of all reads and writes\n" +
1282         "\toperations are reads. In other words, 9 gets for every 1 put.") {
parseValue(String value)1283       @Override public Object parseValue(String value) {
1284         return Integer.parseInt(value);
1285       }
1286     },
1287     mergereadpercent(70,"Ratio of merges to merges&reads (expressed\n" +
1288         "\tas percentage) for the ReadRandomMergeRandom workload. The\n" +
1289         "\tdefault value 70 means 70% out of all read and merge operations\n" +
1290         "\tare merges. In other words, 7 merges for every 3 gets.") {
parseValue(String value)1291       @Override public Object parseValue(String value) {
1292         return Integer.parseInt(value);
1293       }
1294     },
1295     deletepercent(2,"Percentage of deletes out of reads/writes/\n" +
1296         "\tdeletes (used in RandomWithVerify only). RandomWithVerify\n" +
1297         "\tcalculates writepercent as (100 - FLAGS_readwritepercent -\n" +
1298         "\tdeletepercent), so deletepercent must be smaller than (100 -\n" +
1299         "\tFLAGS_readwritepercent)") {
parseValue(String value)1300       @Override public Object parseValue(String value) {
1301         return Integer.parseInt(value);
1302       }
1303     },
1304     delete_obsolete_files_period_micros(0,"Option to delete\n" +
1305         "\tobsolete files periodically. 0 means that obsolete files are\n" +
1306         "\tdeleted after every compaction run.") {
parseValue(String value)1307       @Override public Object parseValue(String value) {
1308         return Integer.parseInt(value);
1309       }
1310     },
1311     compression_type("snappy",
1312         "Algorithm used to compress the database.") {
parseValue(String value)1313       @Override public Object parseValue(String value) {
1314         return value;
1315       }
1316     },
1317     compression_level(-1,
1318         "Compression level. For zlib this should be -1 for the\n" +
1319         "\tdefault level, or between 0 and 9.") {
parseValue(String value)1320       @Override public Object parseValue(String value) {
1321         return Integer.parseInt(value);
1322       }
1323     },
1324     min_level_to_compress(-1,"If non-negative, compression starts\n" +
1325         "\tfrom this level. Levels with number < min_level_to_compress are\n" +
1326         "\tnot compressed. Otherwise, apply compression_type to\n" +
1327         "\tall levels.") {
parseValue(String value)1328       @Override public Object parseValue(String value) {
1329         return Integer.parseInt(value);
1330       }
1331     },
1332     table_cache_numshardbits(4,"") {
parseValue(String value)1333       @Override public Object parseValue(String value) {
1334         return Integer.parseInt(value);
1335       }
1336     },
1337     stats_interval(0L, "Stats are reported every N operations when\n" +
1338         "\tthis is greater than zero. When 0 the interval grows over time.") {
parseValue(String value)1339       @Override public Object parseValue(String value) {
1340         return Long.parseLong(value);
1341       }
1342     },
1343     stats_per_interval(0,"Reports additional stats per interval when\n" +
1344         "\tthis is greater than 0.") {
parseValue(String value)1345       @Override public Object parseValue(String value) {
1346         return Integer.parseInt(value);
1347       }
1348     },
1349     perf_level(0,"Level of perf collection.") {
parseValue(String value)1350       @Override public Object parseValue(String value) {
1351         return Integer.parseInt(value);
1352       }
1353     },
1354     soft_rate_limit(0.0d,"") {
parseValue(String value)1355       @Override public Object parseValue(String value) {
1356         return Double.parseDouble(value);
1357       }
1358     },
1359     hard_rate_limit(0.0d,"When not equal to 0 this make threads\n" +
1360         "\tsleep at each stats reporting interval until the compaction\n" +
1361         "\tscore for all levels is less than or equal to this value.") {
parseValue(String value)1362       @Override public Object parseValue(String value) {
1363         return Double.parseDouble(value);
1364       }
1365     },
1366     rate_limit_delay_max_milliseconds(1000,
1367         "When hard_rate_limit is set then this is the max time a put will\n" +
1368         "\tbe stalled.") {
parseValue(String value)1369       @Override public Object parseValue(String value) {
1370         return Integer.parseInt(value);
1371       }
1372     },
1373     max_compaction_bytes(0L, "Limit number of bytes in one compaction to be lower than this\n" +
1374             "\threshold. But it's not guaranteed.") {
parseValue(String value)1375       @Override public Object parseValue(String value) {
1376         return Long.parseLong(value);
1377       }
1378     },
readonly(false,R)1379     readonly(false,"Run read only benchmarks.") {
1380       @Override public Object parseValue(String value) {
1381         return parseBoolean(value);
1382       }
1383     },
disable_auto_compactions(false,R)1384     disable_auto_compactions(false,"Do not auto trigger compactions.") {
1385       @Override public Object parseValue(String value) {
1386         return parseBoolean(value);
1387       }
1388     },
1389     wal_ttl_seconds(0L,"Set the TTL for the WAL Files in seconds.") {
parseValue(String value)1390       @Override public Object parseValue(String value) {
1391         return Long.parseLong(value);
1392       }
1393     },
1394     wal_size_limit_MB(0L,"Set the size limit for the WAL Files\n" +
1395         "\tin MB.") {
parseValue(String value)1396       @Override public Object parseValue(String value) {
1397         return Long.parseLong(value);
1398       }
1399     },
1400     /* TODO(yhchiang): enable the following
1401     direct_reads(rocksdb::EnvOptions().use_direct_reads,
1402         "Allow direct I/O reads.") {
1403       @Override public Object parseValue(String value) {
1404         return parseBoolean(value);
1405       }
1406       },
1407     direct_writes(rocksdb::EnvOptions().use_direct_reads,
1408       "Allow direct I/O reads.") {
1409       @Override public Object parseValue(String value) {
1410       return parseBoolean(value);
1411       }
1412       },
1413     */
mmap_read(false, R)1414     mmap_read(false,
1415         "Allow reads to occur via mmap-ing files.") {
1416       @Override public Object parseValue(String value) {
1417         return parseBoolean(value);
1418       }
1419     },
mmap_write(false, R)1420     mmap_write(false,
1421         "Allow writes to occur via mmap-ing files.") {
1422       @Override public Object parseValue(String value) {
1423         return parseBoolean(value);
1424       }
1425     },
1426     advise_random_on_open(defaultOptions_.adviseRandomOnOpen(),
1427         "Advise random access on table file open.") {
parseValue(String value)1428       @Override public Object parseValue(String value) {
1429         return parseBoolean(value);
1430       }
1431     },
1432     compaction_fadvice("NORMAL",
1433       "Access pattern advice when a file is compacted.") {
parseValue(String value)1434       @Override public Object parseValue(String value) {
1435         return value;
1436       }
1437     },
use_tailing_iterator(false, R)1438     use_tailing_iterator(false,
1439         "Use tailing iterator to access a series of keys instead of get.") {
1440       @Override public Object parseValue(String value) {
1441         return parseBoolean(value);
1442       }
1443     },
1444     use_adaptive_mutex(defaultOptions_.useAdaptiveMutex(),
1445         "Use adaptive mutex.") {
parseValue(String value)1446       @Override public Object parseValue(String value) {
1447         return parseBoolean(value);
1448       }
1449     },
1450     bytes_per_sync(defaultOptions_.bytesPerSync(),
1451         "Allows OS to incrementally sync files to disk while they are\n" +
1452         "\tbeing written, in the background. Issue one request for every\n" +
1453         "\tbytes_per_sync written. 0 turns it off.") {
parseValue(String value)1454       @Override public Object parseValue(String value) {
1455         return Long.parseLong(value);
1456       }
1457     },
filter_deletes(false,R + R)1458     filter_deletes(false," On true, deletes use bloom-filter and drop\n" +
1459         "\tthe delete if key not present.") {
1460       @Override public Object parseValue(String value) {
1461         return parseBoolean(value);
1462       }
1463     },
1464     max_successive_merges(0,"Maximum number of successive merge\n" +
1465         "\toperations on a key in the memtable.") {
parseValue(String value)1466       @Override public Object parseValue(String value) {
1467         return Integer.parseInt(value);
1468       }
1469     },
1470     db(getTempDir("rocksdb-jni"),
1471        "Use the db with the following name.") {
parseValue(String value)1472       @Override public Object parseValue(String value) {
1473         return value;
1474       }
1475     },
use_mem_env(false, R + R)1476     use_mem_env(false, "Use RocksMemEnv instead of default filesystem based\n" +
1477         "environment.") {
1478       @Override public Object parseValue(String value) {
1479         return parseBoolean(value);
1480       }
1481     },
java_comparator(null, R + R + R)1482     java_comparator(null, "Class name of a Java Comparator to use instead\n" +
1483         "\tof the default C++ ByteWiseComparatorImpl. Must be available on\n" +
1484         "\tthe classpath") {
1485       @Override
1486       protected Object parseValue(final String value) {
1487         try {
1488           final ComparatorOptions copt = new ComparatorOptions();
1489           final Class<AbstractComparator> clsComparator =
1490               (Class<AbstractComparator>)Class.forName(value);
1491           final Constructor cstr =
1492               clsComparator.getConstructor(ComparatorOptions.class);
1493           return cstr.newInstance(copt);
1494         } catch(final ClassNotFoundException cnfe) {
1495           throw new IllegalArgumentException("Java Comparator '" + value + "'" +
1496               " not found on the classpath", cnfe);
1497         } catch(final NoSuchMethodException nsme) {
1498           throw new IllegalArgumentException("Java Comparator '" + value + "'" +
1499               " does not have a public ComparatorOptions constructor", nsme);
1500         } catch(final IllegalAccessException | InstantiationException
1501             | InvocationTargetException ie) {
1502           throw new IllegalArgumentException("Unable to construct Java" +
1503               " Comparator '" + value + "'", ie);
1504         }
1505       }
1506     };
1507 
Flag(Object defaultValue, String desc)1508     private Flag(Object defaultValue, String desc) {
1509       defaultValue_ = defaultValue;
1510       desc_ = desc;
1511     }
1512 
getDefaultValue()1513     public Object getDefaultValue() {
1514       return defaultValue_;
1515     }
1516 
desc()1517     public String desc() {
1518       return desc_;
1519     }
1520 
parseBoolean(String value)1521     public boolean parseBoolean(String value) {
1522       if (value.equals("1")) {
1523         return true;
1524       } else if (value.equals("0")) {
1525         return false;
1526       }
1527       return Boolean.parseBoolean(value);
1528     }
1529 
parseValue(String value)1530     protected abstract Object parseValue(String value);
1531 
1532     private final Object defaultValue_;
1533     private final String desc_;
1534   }
1535 
1536   private final static String DEFAULT_TEMP_DIR = "/tmp";
1537 
getTempDir(final String dirName)1538   private static String getTempDir(final String dirName) {
1539     try {
1540       return Files.createTempDirectory(dirName).toAbsolutePath().toString();
1541     } catch(final IOException ioe) {
1542       System.err.println("Unable to create temp directory, defaulting to: " +
1543           DEFAULT_TEMP_DIR);
1544       return DEFAULT_TEMP_DIR + File.pathSeparator + dirName;
1545     }
1546   }
1547 
1548   private static class RandomGenerator {
1549     private final byte[] data_;
1550     private int dataLength_;
1551     private int position_;
1552     private double compressionRatio_;
1553     Random rand_;
1554 
RandomGenerator(long seed, double compressionRatio)1555     private RandomGenerator(long seed, double compressionRatio) {
1556       // We use a limited amount of data over and over again and ensure
1557       // that it is larger than the compression window (32KB), and also
1558       byte[] value = new byte[100];
1559       // large enough to serve all typical value sizes we want to write.
1560       rand_ = new Random(seed);
1561       dataLength_ = value.length * 10000;
1562       data_ = new byte[dataLength_];
1563       compressionRatio_ = compressionRatio;
1564       int pos = 0;
1565       while (pos < dataLength_) {
1566         compressibleBytes(value);
1567         System.arraycopy(value, 0, data_, pos,
1568                          Math.min(value.length, dataLength_ - pos));
1569         pos += value.length;
1570       }
1571     }
1572 
compressibleBytes(byte[] value)1573     private void compressibleBytes(byte[] value) {
1574       int baseLength = value.length;
1575       if (compressionRatio_ < 1.0d) {
1576         baseLength = (int) (compressionRatio_ * value.length + 0.5);
1577       }
1578       if (baseLength <= 0) {
1579         baseLength = 1;
1580       }
1581       int pos;
1582       for (pos = 0; pos < baseLength; ++pos) {
1583         value[pos] = (byte) (' ' + rand_.nextInt(95));  // ' ' .. '~'
1584       }
1585       while (pos < value.length) {
1586         System.arraycopy(value, 0, value, pos,
1587                          Math.min(baseLength, value.length - pos));
1588         pos += baseLength;
1589       }
1590     }
1591 
generate(byte[] value)1592     private void generate(byte[] value) {
1593       if (position_ + value.length > data_.length) {
1594         position_ = 0;
1595         assert(value.length <= data_.length);
1596       }
1597       position_ += value.length;
1598       System.arraycopy(data_, position_ - value.length,
1599                        value, 0, value.length);
1600     }
1601   }
1602 
isFinished()1603   boolean isFinished() {
1604     synchronized(finishLock_) {
1605       return isFinished_;
1606     }
1607   }
1608 
setFinished(boolean flag)1609   void setFinished(boolean flag) {
1610     synchronized(finishLock_) {
1611       isFinished_ = flag;
1612     }
1613   }
1614 
1615   RocksDB db_;
1616   final List<String> benchmarks_;
1617   final int num_;
1618   final int reads_;
1619   final int keySize_;
1620   final int valueSize_;
1621   final int threadNum_;
1622   final int writesPerSeconds_;
1623   final long randSeed_;
1624   final boolean useExisting_;
1625   final String databaseDir_;
1626   double compressionRatio_;
1627   RandomGenerator gen_;
1628   long startTime_;
1629 
1630   // env
1631   boolean useMemenv_;
1632 
1633   // memtable related
1634   final int maxWriteBufferNumber_;
1635   final int prefixSize_;
1636   final int keysPerPrefix_;
1637   final String memtable_;
1638   final long hashBucketCount_;
1639 
1640   // sst format related
1641   boolean usePlainTable_;
1642 
1643   Object finishLock_;
1644   boolean isFinished_;
1645   Map<Flag, Object> flags_;
1646   // as the scope of a static member equals to the scope of the problem,
1647   // we let its c++ pointer to be disposed in its finalizer.
1648   static Options defaultOptions_ = new Options();
1649   static BlockBasedTableConfig defaultBlockBasedTableOptions_ =
1650     new BlockBasedTableConfig();
1651   String compressionType_;
1652   CompressionType compression_;
1653 }
1654