1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 /** 6 * Copyright (C) 2011 the original author or authors. 7 * See the notice.md file distributed with this work for additional 8 * information regarding copyright ownership. 9 * 10 * Licensed under the Apache License, Version 2.0 (the "License"); 11 * you may not use this file except in compliance with the License. 12 * You may obtain a copy of the License at 13 * 14 * http://www.apache.org/licenses/LICENSE-2.0 15 * 16 * Unless required by applicable law or agreed to in writing, software 17 * distributed under the License is distributed on an "AS IS" BASIS, 18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 19 * See the License for the specific language governing permissions and 20 * limitations under the License. 21 */ 22 package org.rocksdb.benchmark; 23 24 import java.io.IOException; 25 import java.lang.Runnable; 26 import java.lang.Math; 27 import java.io.File; 28 import java.lang.reflect.Constructor; 29 import java.lang.reflect.InvocationTargetException; 30 import java.nio.ByteBuffer; 31 import java.nio.file.Files; 32 import java.util.Collection; 33 import java.util.Date; 34 import java.util.EnumMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.Random; 38 import java.util.concurrent.TimeUnit; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.concurrent.Callable; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ExecutorService; 44 import java.util.concurrent.Future; 45 import java.util.concurrent.TimeUnit; 46 import org.rocksdb.*; 47 import org.rocksdb.RocksMemEnv; 48 import org.rocksdb.util.SizeUnit; 49 50 class Stats { 51 int id_; 52 long start_; 53 long finish_; 54 double seconds_; 55 long done_; 56 long found_; 57 long lastOpTime_; 58 long nextReport_; 59 long bytes_; 60 StringBuilder message_; 61 boolean excludeFromMerge_; 62 63 // TODO(yhchiang): use the following arguments: 64 // (Long)Flag.stats_interval 65 // (Integer)Flag.stats_per_interval 66 Stats(int id)67 Stats(int id) { 68 id_ = id; 69 nextReport_ = 100; 70 done_ = 0; 71 bytes_ = 0; 72 seconds_ = 0; 73 start_ = System.nanoTime(); 74 lastOpTime_ = start_; 75 finish_ = start_; 76 found_ = 0; 77 message_ = new StringBuilder(""); 78 excludeFromMerge_ = false; 79 } 80 merge(final Stats other)81 void merge(final Stats other) { 82 if (other.excludeFromMerge_) { 83 return; 84 } 85 86 done_ += other.done_; 87 found_ += other.found_; 88 bytes_ += other.bytes_; 89 seconds_ += other.seconds_; 90 if (other.start_ < start_) start_ = other.start_; 91 if (other.finish_ > finish_) finish_ = other.finish_; 92 93 // Just keep the messages from one thread 94 if (message_.length() == 0) { 95 message_ = other.message_; 96 } 97 } 98 stop()99 void stop() { 100 finish_ = System.nanoTime(); 101 seconds_ = (double) (finish_ - start_) * 1e-9; 102 } 103 addMessage(String msg)104 void addMessage(String msg) { 105 if (message_.length() > 0) { 106 message_.append(" "); 107 } 108 message_.append(msg); 109 } 110 setId(int id)111 void setId(int id) { id_ = id; } setExcludeFromMerge()112 void setExcludeFromMerge() { excludeFromMerge_ = true; } 113 finishedSingleOp(int bytes)114 void finishedSingleOp(int bytes) { 115 done_++; 116 lastOpTime_ = System.nanoTime(); 117 bytes_ += bytes; 118 if (done_ >= nextReport_) { 119 if (nextReport_ < 1000) { 120 nextReport_ += 100; 121 } else if (nextReport_ < 5000) { 122 nextReport_ += 500; 123 } else if (nextReport_ < 10000) { 124 nextReport_ += 1000; 125 } else if (nextReport_ < 50000) { 126 nextReport_ += 5000; 127 } else if (nextReport_ < 100000) { 128 nextReport_ += 10000; 129 } else if (nextReport_ < 500000) { 130 nextReport_ += 50000; 131 } else { 132 nextReport_ += 100000; 133 } 134 System.err.printf("... Task %s finished %d ops%30s\r", id_, done_, ""); 135 } 136 } 137 report(String name)138 void report(String name) { 139 // Pretend at least one op was done in case we are running a benchmark 140 // that does not call FinishedSingleOp(). 141 if (done_ < 1) done_ = 1; 142 143 StringBuilder extra = new StringBuilder(""); 144 if (bytes_ > 0) { 145 // Rate is computed on actual elapsed time, not the sum of per-thread 146 // elapsed times. 147 double elapsed = (finish_ - start_) * 1e-9; 148 extra.append(String.format("%6.1f MB/s", (bytes_ / 1048576.0) / elapsed)); 149 } 150 extra.append(message_.toString()); 151 double elapsed = (finish_ - start_); 152 double throughput = (double) done_ / (elapsed * 1e-9); 153 154 System.out.format("%-12s : %11.3f micros/op %d ops/sec;%s%s\n", 155 name, (elapsed * 1e-6) / done_, 156 (long) throughput, (extra.length() == 0 ? "" : " "), extra.toString()); 157 } 158 } 159 160 public class DbBenchmark { 161 enum Order { 162 SEQUENTIAL, 163 RANDOM 164 } 165 166 enum DBState { 167 FRESH, 168 EXISTING 169 } 170 171 static { RocksDB.loadLibrary()172 RocksDB.loadLibrary(); 173 } 174 175 abstract class BenchmarkTask implements Callable<Stats> { 176 // TODO(yhchiang): use (Integer)Flag.perf_level. BenchmarkTask( int tid, long randSeed, long numEntries, long keyRange)177 public BenchmarkTask( 178 int tid, long randSeed, long numEntries, long keyRange) { 179 tid_ = tid; 180 rand_ = new Random(randSeed + tid * 1000); 181 numEntries_ = numEntries; 182 keyRange_ = keyRange; 183 stats_ = new Stats(tid); 184 } 185 call()186 @Override public Stats call() throws RocksDBException { 187 stats_.start_ = System.nanoTime(); 188 runTask(); 189 stats_.finish_ = System.nanoTime(); 190 return stats_; 191 } 192 runTask()193 abstract protected void runTask() throws RocksDBException; 194 195 protected int tid_; 196 protected Random rand_; 197 protected long numEntries_; 198 protected long keyRange_; 199 protected Stats stats_; 200 getFixedKey(byte[] key, long sn)201 protected void getFixedKey(byte[] key, long sn) { 202 generateKeyFromLong(key, sn); 203 } 204 getRandomKey(byte[] key, long range)205 protected void getRandomKey(byte[] key, long range) { 206 generateKeyFromLong(key, Math.abs(rand_.nextLong() % range)); 207 } 208 } 209 210 abstract class WriteTask extends BenchmarkTask { WriteTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)211 public WriteTask( 212 int tid, long randSeed, long numEntries, long keyRange, 213 WriteOptions writeOpt, long entriesPerBatch) { 214 super(tid, randSeed, numEntries, keyRange); 215 writeOpt_ = writeOpt; 216 entriesPerBatch_ = entriesPerBatch; 217 maxWritesPerSecond_ = -1; 218 } 219 WriteTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)220 public WriteTask( 221 int tid, long randSeed, long numEntries, long keyRange, 222 WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond) { 223 super(tid, randSeed, numEntries, keyRange); 224 writeOpt_ = writeOpt; 225 entriesPerBatch_ = entriesPerBatch; 226 maxWritesPerSecond_ = maxWritesPerSecond; 227 } 228 runTask()229 @Override public void runTask() throws RocksDBException { 230 if (numEntries_ != DbBenchmark.this.num_) { 231 stats_.message_.append(String.format(" (%d ops)", numEntries_)); 232 } 233 byte[] key = new byte[keySize_]; 234 byte[] value = new byte[valueSize_]; 235 236 try { 237 if (entriesPerBatch_ == 1) { 238 for (long i = 0; i < numEntries_; ++i) { 239 getKey(key, i, keyRange_); 240 DbBenchmark.this.gen_.generate(value); 241 db_.put(writeOpt_, key, value); 242 stats_.finishedSingleOp(keySize_ + valueSize_); 243 writeRateControl(i); 244 if (isFinished()) { 245 return; 246 } 247 } 248 } else { 249 for (long i = 0; i < numEntries_; i += entriesPerBatch_) { 250 WriteBatch batch = new WriteBatch(); 251 for (long j = 0; j < entriesPerBatch_; j++) { 252 getKey(key, i + j, keyRange_); 253 DbBenchmark.this.gen_.generate(value); 254 batch.put(key, value); 255 stats_.finishedSingleOp(keySize_ + valueSize_); 256 } 257 db_.write(writeOpt_, batch); 258 batch.dispose(); 259 writeRateControl(i); 260 if (isFinished()) { 261 return; 262 } 263 } 264 } 265 } catch (InterruptedException e) { 266 // thread has been terminated. 267 } 268 } 269 writeRateControl(long writeCount)270 protected void writeRateControl(long writeCount) 271 throws InterruptedException { 272 if (maxWritesPerSecond_ <= 0) return; 273 long minInterval = 274 writeCount * TimeUnit.SECONDS.toNanos(1) / maxWritesPerSecond_; 275 long interval = System.nanoTime() - stats_.start_; 276 if (minInterval - interval > TimeUnit.MILLISECONDS.toNanos(1)) { 277 TimeUnit.NANOSECONDS.sleep(minInterval - interval); 278 } 279 } 280 getKey(byte[] key, long id, long range)281 abstract protected void getKey(byte[] key, long id, long range); 282 protected WriteOptions writeOpt_; 283 protected long entriesPerBatch_; 284 protected long maxWritesPerSecond_; 285 } 286 287 class WriteSequentialTask extends WriteTask { WriteSequentialTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)288 public WriteSequentialTask( 289 int tid, long randSeed, long numEntries, long keyRange, 290 WriteOptions writeOpt, long entriesPerBatch) { 291 super(tid, randSeed, numEntries, keyRange, 292 writeOpt, entriesPerBatch); 293 } WriteSequentialTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)294 public WriteSequentialTask( 295 int tid, long randSeed, long numEntries, long keyRange, 296 WriteOptions writeOpt, long entriesPerBatch, 297 long maxWritesPerSecond) { 298 super(tid, randSeed, numEntries, keyRange, 299 writeOpt, entriesPerBatch, 300 maxWritesPerSecond); 301 } getKey(byte[] key, long id, long range)302 @Override protected void getKey(byte[] key, long id, long range) { 303 getFixedKey(key, id); 304 } 305 } 306 307 class WriteRandomTask extends WriteTask { WriteRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)308 public WriteRandomTask( 309 int tid, long randSeed, long numEntries, long keyRange, 310 WriteOptions writeOpt, long entriesPerBatch) { 311 super(tid, randSeed, numEntries, keyRange, 312 writeOpt, entriesPerBatch); 313 } WriteRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)314 public WriteRandomTask( 315 int tid, long randSeed, long numEntries, long keyRange, 316 WriteOptions writeOpt, long entriesPerBatch, 317 long maxWritesPerSecond) { 318 super(tid, randSeed, numEntries, keyRange, 319 writeOpt, entriesPerBatch, 320 maxWritesPerSecond); 321 } getKey(byte[] key, long id, long range)322 @Override protected void getKey(byte[] key, long id, long range) { 323 getRandomKey(key, range); 324 } 325 } 326 327 class WriteUniqueRandomTask extends WriteTask { 328 static final int MAX_BUFFER_SIZE = 10000000; WriteUniqueRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch)329 public WriteUniqueRandomTask( 330 int tid, long randSeed, long numEntries, long keyRange, 331 WriteOptions writeOpt, long entriesPerBatch) { 332 super(tid, randSeed, numEntries, keyRange, 333 writeOpt, entriesPerBatch); 334 initRandomKeySequence(); 335 } WriteUniqueRandomTask( int tid, long randSeed, long numEntries, long keyRange, WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond)336 public WriteUniqueRandomTask( 337 int tid, long randSeed, long numEntries, long keyRange, 338 WriteOptions writeOpt, long entriesPerBatch, 339 long maxWritesPerSecond) { 340 super(tid, randSeed, numEntries, keyRange, 341 writeOpt, entriesPerBatch, 342 maxWritesPerSecond); 343 initRandomKeySequence(); 344 } getKey(byte[] key, long id, long range)345 @Override protected void getKey(byte[] key, long id, long range) { 346 generateKeyFromLong(key, nextUniqueRandom()); 347 } 348 initRandomKeySequence()349 protected void initRandomKeySequence() { 350 bufferSize_ = MAX_BUFFER_SIZE; 351 if (bufferSize_ > keyRange_) { 352 bufferSize_ = (int) keyRange_; 353 } 354 currentKeyCount_ = bufferSize_; 355 keyBuffer_ = new long[MAX_BUFFER_SIZE]; 356 for (int k = 0; k < bufferSize_; ++k) { 357 keyBuffer_[k] = k; 358 } 359 } 360 361 /** 362 * Semi-randomly return the next unique key. It is guaranteed to be 363 * fully random if keyRange_ <= MAX_BUFFER_SIZE. 364 */ nextUniqueRandom()365 long nextUniqueRandom() { 366 if (bufferSize_ == 0) { 367 System.err.println("bufferSize_ == 0."); 368 return 0; 369 } 370 int r = rand_.nextInt(bufferSize_); 371 // randomly pick one from the keyBuffer 372 long randKey = keyBuffer_[r]; 373 if (currentKeyCount_ < keyRange_) { 374 // if we have not yet inserted all keys, insert next new key to [r]. 375 keyBuffer_[r] = currentKeyCount_++; 376 } else { 377 // move the last element to [r] and decrease the size by 1. 378 keyBuffer_[r] = keyBuffer_[--bufferSize_]; 379 } 380 return randKey; 381 } 382 383 int bufferSize_; 384 long currentKeyCount_; 385 long[] keyBuffer_; 386 } 387 388 class ReadRandomTask extends BenchmarkTask { ReadRandomTask( int tid, long randSeed, long numEntries, long keyRange)389 public ReadRandomTask( 390 int tid, long randSeed, long numEntries, long keyRange) { 391 super(tid, randSeed, numEntries, keyRange); 392 } runTask()393 @Override public void runTask() throws RocksDBException { 394 byte[] key = new byte[keySize_]; 395 byte[] value = new byte[valueSize_]; 396 for (long i = 0; i < numEntries_; i++) { 397 getRandomKey(key, keyRange_); 398 int len = db_.get(key, value); 399 if (len != RocksDB.NOT_FOUND) { 400 stats_.found_++; 401 stats_.finishedSingleOp(keySize_ + valueSize_); 402 } else { 403 stats_.finishedSingleOp(keySize_); 404 } 405 if (isFinished()) { 406 return; 407 } 408 } 409 } 410 } 411 412 class ReadSequentialTask extends BenchmarkTask { ReadSequentialTask( int tid, long randSeed, long numEntries, long keyRange)413 public ReadSequentialTask( 414 int tid, long randSeed, long numEntries, long keyRange) { 415 super(tid, randSeed, numEntries, keyRange); 416 } runTask()417 @Override public void runTask() throws RocksDBException { 418 RocksIterator iter = db_.newIterator(); 419 long i; 420 for (iter.seekToFirst(), i = 0; 421 iter.isValid() && i < numEntries_; 422 iter.next(), ++i) { 423 stats_.found_++; 424 stats_.finishedSingleOp(iter.key().length + iter.value().length); 425 if (isFinished()) { 426 iter.dispose(); 427 return; 428 } 429 } 430 iter.dispose(); 431 } 432 } 433 DbBenchmark(Map<Flag, Object> flags)434 public DbBenchmark(Map<Flag, Object> flags) throws Exception { 435 benchmarks_ = (List<String>) flags.get(Flag.benchmarks); 436 num_ = (Integer) flags.get(Flag.num); 437 threadNum_ = (Integer) flags.get(Flag.threads); 438 reads_ = (Integer) (flags.get(Flag.reads) == null ? 439 flags.get(Flag.num) : flags.get(Flag.reads)); 440 keySize_ = (Integer) flags.get(Flag.key_size); 441 valueSize_ = (Integer) flags.get(Flag.value_size); 442 compressionRatio_ = (Double) flags.get(Flag.compression_ratio); 443 useExisting_ = (Boolean) flags.get(Flag.use_existing_db); 444 randSeed_ = (Long) flags.get(Flag.seed); 445 databaseDir_ = (String) flags.get(Flag.db); 446 writesPerSeconds_ = (Integer) flags.get(Flag.writes_per_second); 447 memtable_ = (String) flags.get(Flag.memtablerep); 448 maxWriteBufferNumber_ = (Integer) flags.get(Flag.max_write_buffer_number); 449 prefixSize_ = (Integer) flags.get(Flag.prefix_size); 450 keysPerPrefix_ = (Integer) flags.get(Flag.keys_per_prefix); 451 hashBucketCount_ = (Long) flags.get(Flag.hash_bucket_count); 452 usePlainTable_ = (Boolean) flags.get(Flag.use_plain_table); 453 useMemenv_ = (Boolean) flags.get(Flag.use_mem_env); 454 flags_ = flags; 455 finishLock_ = new Object(); 456 // options.setPrefixSize((Integer)flags_.get(Flag.prefix_size)); 457 // options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix)); 458 compressionType_ = (String) flags.get(Flag.compression_type); 459 compression_ = CompressionType.NO_COMPRESSION; 460 try { 461 if (compressionType_!=null) { 462 final CompressionType compressionType = 463 CompressionType.getCompressionType(compressionType_); 464 if (compressionType != null && 465 compressionType != CompressionType.NO_COMPRESSION) { 466 System.loadLibrary(compressionType.getLibraryName()); 467 } 468 469 } 470 } catch (UnsatisfiedLinkError e) { 471 System.err.format("Unable to load %s library:%s%n" + 472 "No compression is used.%n", 473 compressionType_, e.toString()); 474 compressionType_ = "none"; 475 } 476 gen_ = new RandomGenerator(randSeed_, compressionRatio_); 477 } 478 prepareReadOptions(ReadOptions options)479 private void prepareReadOptions(ReadOptions options) { 480 options.setVerifyChecksums((Boolean)flags_.get(Flag.verify_checksum)); 481 options.setTailing((Boolean)flags_.get(Flag.use_tailing_iterator)); 482 } 483 prepareWriteOptions(WriteOptions options)484 private void prepareWriteOptions(WriteOptions options) { 485 options.setSync((Boolean)flags_.get(Flag.sync)); 486 options.setDisableWAL((Boolean)flags_.get(Flag.disable_wal)); 487 } 488 prepareOptions(Options options)489 private void prepareOptions(Options options) throws RocksDBException { 490 if (!useExisting_) { 491 options.setCreateIfMissing(true); 492 } else { 493 options.setCreateIfMissing(false); 494 } 495 if (useMemenv_) { 496 options.setEnv(new RocksMemEnv(Env.getDefault())); 497 } 498 switch (memtable_) { 499 case "skip_list": 500 options.setMemTableConfig(new SkipListMemTableConfig()); 501 break; 502 case "vector": 503 options.setMemTableConfig(new VectorMemTableConfig()); 504 break; 505 case "hash_linkedlist": 506 options.setMemTableConfig( 507 new HashLinkedListMemTableConfig() 508 .setBucketCount(hashBucketCount_)); 509 options.useFixedLengthPrefixExtractor(prefixSize_); 510 break; 511 case "hash_skiplist": 512 case "prefix_hash": 513 options.setMemTableConfig( 514 new HashSkipListMemTableConfig() 515 .setBucketCount(hashBucketCount_)); 516 options.useFixedLengthPrefixExtractor(prefixSize_); 517 break; 518 default: 519 System.err.format( 520 "unable to detect the specified memtable, " + 521 "use the default memtable factory %s%n", 522 options.memTableFactoryName()); 523 break; 524 } 525 if (usePlainTable_) { 526 options.setTableFormatConfig( 527 new PlainTableConfig().setKeySize(keySize_)); 528 } else { 529 BlockBasedTableConfig table_options = new BlockBasedTableConfig(); 530 table_options.setBlockSize((Long)flags_.get(Flag.block_size)) 531 .setBlockCacheSize((Long)flags_.get(Flag.cache_size)) 532 .setCacheNumShardBits( 533 (Integer)flags_.get(Flag.cache_numshardbits)); 534 options.setTableFormatConfig(table_options); 535 } 536 options.setWriteBufferSize( 537 (Long)flags_.get(Flag.write_buffer_size)); 538 options.setMaxWriteBufferNumber( 539 (Integer)flags_.get(Flag.max_write_buffer_number)); 540 options.setMaxBackgroundCompactions( 541 (Integer)flags_.get(Flag.max_background_compactions)); 542 options.getEnv().setBackgroundThreads( 543 (Integer)flags_.get(Flag.max_background_compactions)); 544 options.setMaxBackgroundFlushes( 545 (Integer)flags_.get(Flag.max_background_flushes)); 546 options.setMaxBackgroundJobs((Integer) flags_.get(Flag.max_background_jobs)); 547 options.setMaxOpenFiles( 548 (Integer)flags_.get(Flag.open_files)); 549 options.setUseFsync( 550 (Boolean)flags_.get(Flag.use_fsync)); 551 options.setWalDir( 552 (String)flags_.get(Flag.wal_dir)); 553 options.setDeleteObsoleteFilesPeriodMicros( 554 (Integer)flags_.get(Flag.delete_obsolete_files_period_micros)); 555 options.setTableCacheNumshardbits( 556 (Integer)flags_.get(Flag.table_cache_numshardbits)); 557 options.setAllowMmapReads( 558 (Boolean)flags_.get(Flag.mmap_read)); 559 options.setAllowMmapWrites( 560 (Boolean)flags_.get(Flag.mmap_write)); 561 options.setAdviseRandomOnOpen( 562 (Boolean)flags_.get(Flag.advise_random_on_open)); 563 options.setUseAdaptiveMutex( 564 (Boolean)flags_.get(Flag.use_adaptive_mutex)); 565 options.setBytesPerSync( 566 (Long)flags_.get(Flag.bytes_per_sync)); 567 options.setBloomLocality( 568 (Integer)flags_.get(Flag.bloom_locality)); 569 options.setMinWriteBufferNumberToMerge( 570 (Integer)flags_.get(Flag.min_write_buffer_number_to_merge)); 571 options.setMemtablePrefixBloomSizeRatio((Double) flags_.get(Flag.memtable_bloom_size_ratio)); 572 options.setNumLevels( 573 (Integer)flags_.get(Flag.num_levels)); 574 options.setTargetFileSizeBase( 575 (Integer)flags_.get(Flag.target_file_size_base)); 576 options.setTargetFileSizeMultiplier((Integer)flags_.get(Flag.target_file_size_multiplier)); 577 options.setMaxBytesForLevelBase( 578 (Integer)flags_.get(Flag.max_bytes_for_level_base)); 579 options.setMaxBytesForLevelMultiplier((Double) flags_.get(Flag.max_bytes_for_level_multiplier)); 580 options.setLevelZeroStopWritesTrigger( 581 (Integer)flags_.get(Flag.level0_stop_writes_trigger)); 582 options.setLevelZeroSlowdownWritesTrigger( 583 (Integer)flags_.get(Flag.level0_slowdown_writes_trigger)); 584 options.setLevelZeroFileNumCompactionTrigger( 585 (Integer)flags_.get(Flag.level0_file_num_compaction_trigger)); 586 options.setMaxCompactionBytes( 587 (Long) flags_.get(Flag.max_compaction_bytes)); 588 options.setDisableAutoCompactions( 589 (Boolean)flags_.get(Flag.disable_auto_compactions)); 590 options.setMaxSuccessiveMerges( 591 (Integer)flags_.get(Flag.max_successive_merges)); 592 options.setWalTtlSeconds((Long)flags_.get(Flag.wal_ttl_seconds)); 593 options.setWalSizeLimitMB((Long)flags_.get(Flag.wal_size_limit_MB)); 594 if(flags_.get(Flag.java_comparator) != null) { 595 options.setComparator( 596 (AbstractComparator)flags_.get(Flag.java_comparator)); 597 } 598 599 /* TODO(yhchiang): enable the following parameters 600 options.setCompressionType((String)flags_.get(Flag.compression_type)); 601 options.setCompressionLevel((Integer)flags_.get(Flag.compression_level)); 602 options.setMinLevelToCompress((Integer)flags_.get(Flag.min_level_to_compress)); 603 options.setHdfs((String)flags_.get(Flag.hdfs)); // env 604 options.setStatistics((Boolean)flags_.get(Flag.statistics)); 605 options.setUniversalSizeRatio( 606 (Integer)flags_.get(Flag.universal_size_ratio)); 607 options.setUniversalMinMergeWidth( 608 (Integer)flags_.get(Flag.universal_min_merge_width)); 609 options.setUniversalMaxMergeWidth( 610 (Integer)flags_.get(Flag.universal_max_merge_width)); 611 options.setUniversalMaxSizeAmplificationPercent( 612 (Integer)flags_.get(Flag.universal_max_size_amplification_percent)); 613 options.setUniversalCompressionSizePercent( 614 (Integer)flags_.get(Flag.universal_compression_size_percent)); 615 // TODO(yhchiang): add RocksDB.openForReadOnly() to enable Flag.readonly 616 // TODO(yhchiang): enable Flag.merge_operator by switch 617 options.setAccessHintOnCompactionStart( 618 (String)flags_.get(Flag.compaction_fadvice)); 619 // available values of fadvice are "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" for fadvice 620 */ 621 } 622 run()623 private void run() throws RocksDBException { 624 if (!useExisting_) { 625 destroyDb(); 626 } 627 Options options = new Options(); 628 prepareOptions(options); 629 open(options); 630 631 printHeader(options); 632 633 for (String benchmark : benchmarks_) { 634 List<Callable<Stats>> tasks = new ArrayList<Callable<Stats>>(); 635 List<Callable<Stats>> bgTasks = new ArrayList<Callable<Stats>>(); 636 WriteOptions writeOpt = new WriteOptions(); 637 prepareWriteOptions(writeOpt); 638 ReadOptions readOpt = new ReadOptions(); 639 prepareReadOptions(readOpt); 640 int currentTaskId = 0; 641 boolean known = true; 642 643 switch (benchmark) { 644 case "fillseq": 645 tasks.add(new WriteSequentialTask( 646 currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); 647 break; 648 case "fillbatch": 649 tasks.add( 650 new WriteSequentialTask(currentTaskId++, randSeed_, num_, num_, writeOpt, 1000)); 651 break; 652 case "fillrandom": 653 tasks.add(new WriteRandomTask( 654 currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); 655 break; 656 case "filluniquerandom": 657 tasks.add(new WriteUniqueRandomTask( 658 currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); 659 break; 660 case "fillsync": 661 writeOpt.setSync(true); 662 tasks.add(new WriteRandomTask( 663 currentTaskId++, randSeed_, num_ / 1000, num_ / 1000, 664 writeOpt, 1)); 665 break; 666 case "readseq": 667 for (int t = 0; t < threadNum_; ++t) { 668 tasks.add(new ReadSequentialTask( 669 currentTaskId++, randSeed_, reads_ / threadNum_, num_)); 670 } 671 break; 672 case "readrandom": 673 for (int t = 0; t < threadNum_; ++t) { 674 tasks.add(new ReadRandomTask( 675 currentTaskId++, randSeed_, reads_ / threadNum_, num_)); 676 } 677 break; 678 case "readwhilewriting": 679 WriteTask writeTask = new WriteRandomTask( 680 -1, randSeed_, Long.MAX_VALUE, num_, writeOpt, 1, writesPerSeconds_); 681 writeTask.stats_.setExcludeFromMerge(); 682 bgTasks.add(writeTask); 683 for (int t = 0; t < threadNum_; ++t) { 684 tasks.add(new ReadRandomTask( 685 currentTaskId++, randSeed_, reads_ / threadNum_, num_)); 686 } 687 break; 688 case "readhot": 689 for (int t = 0; t < threadNum_; ++t) { 690 tasks.add(new ReadRandomTask( 691 currentTaskId++, randSeed_, reads_ / threadNum_, num_ / 100)); 692 } 693 break; 694 case "delete": 695 destroyDb(); 696 open(options); 697 break; 698 default: 699 known = false; 700 System.err.println("Unknown benchmark: " + benchmark); 701 break; 702 } 703 if (known) { 704 ExecutorService executor = Executors.newCachedThreadPool(); 705 ExecutorService bgExecutor = Executors.newCachedThreadPool(); 706 try { 707 // measure only the main executor time 708 List<Future<Stats>> bgResults = new ArrayList<Future<Stats>>(); 709 for (Callable bgTask : bgTasks) { 710 bgResults.add(bgExecutor.submit(bgTask)); 711 } 712 start(); 713 List<Future<Stats>> results = executor.invokeAll(tasks); 714 executor.shutdown(); 715 boolean finished = executor.awaitTermination(10, TimeUnit.SECONDS); 716 if (!finished) { 717 System.out.format( 718 "Benchmark %s was not finished before timeout.", 719 benchmark); 720 executor.shutdownNow(); 721 } 722 setFinished(true); 723 bgExecutor.shutdown(); 724 finished = bgExecutor.awaitTermination(10, TimeUnit.SECONDS); 725 if (!finished) { 726 System.out.format( 727 "Benchmark %s was not finished before timeout.", 728 benchmark); 729 bgExecutor.shutdownNow(); 730 } 731 732 stop(benchmark, results, currentTaskId); 733 } catch (InterruptedException e) { 734 System.err.println(e); 735 } 736 } 737 writeOpt.dispose(); 738 readOpt.dispose(); 739 } 740 options.dispose(); 741 db_.close(); 742 } 743 printHeader(Options options)744 private void printHeader(Options options) { 745 int kKeySize = 16; 746 System.out.printf("Keys: %d bytes each\n", kKeySize); 747 System.out.printf("Values: %d bytes each (%d bytes after compression)\n", 748 valueSize_, 749 (int) (valueSize_ * compressionRatio_ + 0.5)); 750 System.out.printf("Entries: %d\n", num_); 751 System.out.printf("RawSize: %.1f MB (estimated)\n", 752 ((double)(kKeySize + valueSize_) * num_) / SizeUnit.MB); 753 System.out.printf("FileSize: %.1f MB (estimated)\n", 754 (((kKeySize + valueSize_ * compressionRatio_) * num_) / SizeUnit.MB)); 755 System.out.format("Memtable Factory: %s%n", options.memTableFactoryName()); 756 System.out.format("Prefix: %d bytes%n", prefixSize_); 757 System.out.format("Compression: %s%n", compressionType_); 758 printWarnings(); 759 System.out.printf("------------------------------------------------\n"); 760 } 761 printWarnings()762 void printWarnings() { 763 boolean assertsEnabled = false; 764 assert assertsEnabled = true; // Intentional side effect!!! 765 if (assertsEnabled) { 766 System.out.printf( 767 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); 768 } 769 } 770 open(Options options)771 private void open(Options options) throws RocksDBException { 772 System.out.println("Using database directory: " + databaseDir_); 773 db_ = RocksDB.open(options, databaseDir_); 774 } 775 start()776 private void start() { 777 setFinished(false); 778 startTime_ = System.nanoTime(); 779 } 780 stop( String benchmark, List<Future<Stats>> results, int concurrentThreads)781 private void stop( 782 String benchmark, List<Future<Stats>> results, int concurrentThreads) { 783 long endTime = System.nanoTime(); 784 double elapsedSeconds = 785 1.0d * (endTime - startTime_) / TimeUnit.SECONDS.toNanos(1); 786 787 Stats stats = new Stats(-1); 788 int taskFinishedCount = 0; 789 for (Future<Stats> result : results) { 790 if (result.isDone()) { 791 try { 792 Stats taskStats = result.get(3, TimeUnit.SECONDS); 793 if (!result.isCancelled()) { 794 taskFinishedCount++; 795 } 796 stats.merge(taskStats); 797 } catch (Exception e) { 798 // then it's not successful, the output will indicate this 799 } 800 } 801 } 802 String extra = ""; 803 if (benchmark.indexOf("read") >= 0) { 804 extra = String.format(" %d / %d found; ", stats.found_, stats.done_); 805 } else { 806 extra = String.format(" %d ops done; ", stats.done_); 807 } 808 809 System.out.printf( 810 "%-16s : %11.5f micros/op; %6.1f MB/s;%s %d / %d task(s) finished.\n", 811 benchmark, elapsedSeconds / stats.done_ * 1e6, 812 (stats.bytes_ / 1048576.0) / elapsedSeconds, extra, 813 taskFinishedCount, concurrentThreads); 814 } 815 generateKeyFromLong(byte[] slice, long n)816 public void generateKeyFromLong(byte[] slice, long n) { 817 assert(n >= 0); 818 int startPos = 0; 819 820 if (keysPerPrefix_ > 0) { 821 long numPrefix = (num_ + keysPerPrefix_ - 1) / keysPerPrefix_; 822 long prefix = n % numPrefix; 823 int bytesToFill = Math.min(prefixSize_, 8); 824 for (int i = 0; i < bytesToFill; ++i) { 825 slice[i] = (byte) (prefix % 256); 826 prefix /= 256; 827 } 828 for (int i = 8; i < bytesToFill; ++i) { 829 slice[i] = '0'; 830 } 831 startPos = bytesToFill; 832 } 833 834 for (int i = slice.length - 1; i >= startPos; --i) { 835 slice[i] = (byte) ('0' + (n % 10)); 836 n /= 10; 837 } 838 } 839 destroyDb()840 private void destroyDb() { 841 if (db_ != null) { 842 db_.close(); 843 } 844 // TODO(yhchiang): develop our own FileUtil 845 // FileUtil.deleteDir(databaseDir_); 846 } 847 printStats()848 private void printStats() { 849 } 850 printHelp()851 static void printHelp() { 852 System.out.println("usage:"); 853 for (Flag flag : Flag.values()) { 854 System.out.format(" --%s%n\t%s%n", 855 flag.name(), 856 flag.desc()); 857 if (flag.getDefaultValue() != null) { 858 System.out.format("\tDEFAULT: %s%n", 859 flag.getDefaultValue().toString()); 860 } 861 } 862 } 863 main(String[] args)864 public static void main(String[] args) throws Exception { 865 Map<Flag, Object> flags = new EnumMap<Flag, Object>(Flag.class); 866 for (Flag flag : Flag.values()) { 867 if (flag.getDefaultValue() != null) { 868 flags.put(flag, flag.getDefaultValue()); 869 } 870 } 871 for (String arg : args) { 872 boolean valid = false; 873 if (arg.equals("--help") || arg.equals("-h")) { 874 printHelp(); 875 System.exit(0); 876 } 877 if (arg.startsWith("--")) { 878 try { 879 String[] parts = arg.substring(2).split("="); 880 if (parts.length >= 1) { 881 Flag key = Flag.valueOf(parts[0]); 882 if (key != null) { 883 Object value = null; 884 if (parts.length >= 2) { 885 value = key.parseValue(parts[1]); 886 } 887 flags.put(key, value); 888 valid = true; 889 } 890 } 891 } 892 catch (Exception e) { 893 } 894 } 895 if (!valid) { 896 System.err.println("Invalid argument " + arg); 897 System.exit(1); 898 } 899 } 900 new DbBenchmark(flags).run(); 901 } 902 903 private enum Flag { 904 benchmarks(Arrays.asList("fillseq", "readrandom", "fillrandom"), 905 "Comma-separated list of operations to run in the specified order\n" 906 + "\tActual benchmarks:\n" 907 + "\t\tfillseq -- write N values in sequential key order in async mode.\n" 908 + "\t\tfillrandom -- write N values in random key order in async mode.\n" 909 + "\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" 910 + "\t\t in sequential key order in sync mode.\n" 911 + "\t\tfillsync -- write N/100 values in random key order in sync mode.\n" 912 + "\t\tfill100K -- write N/1000 100K values in random order in async mode.\n" 913 + "\t\treadseq -- read N times sequentially.\n" 914 + "\t\treadrandom -- read N times in random order.\n" 915 + "\t\treadhot -- read N times in random order from 1% section of DB.\n" 916 + "\t\treadwhilewriting -- measure the read performance of multiple readers\n" 917 + "\t\t with a bg single writer. The write rate of the bg\n" 918 + "\t\t is capped by --writes_per_second.\n" 919 + "\tMeta Operations:\n" 920 + "\t\tdelete -- delete DB") { parseValue(String value)921 @Override public Object parseValue(String value) { 922 return new ArrayList<String>(Arrays.asList(value.split(","))); 923 } 924 }, 925 compression_ratio(0.5d, 926 "Arrange to generate values that shrink to this fraction of\n" + 927 "\ttheir original size after compression.") { parseValue(String value)928 @Override public Object parseValue(String value) { 929 return Double.parseDouble(value); 930 } 931 }, use_existing_db(false, R + R + R)932 use_existing_db(false, 933 "If true, do not destroy the existing database. If you set this\n" + 934 "\tflag and also specify a benchmark that wants a fresh database,\n" + 935 "\tthat benchmark will fail.") { 936 @Override public Object parseValue(String value) { 937 return parseBoolean(value); 938 } 939 }, 940 num(1000000, 941 "Number of key/values to place in database.") { parseValue(String value)942 @Override public Object parseValue(String value) { 943 return Integer.parseInt(value); 944 } 945 }, 946 threads(1, 947 "Number of concurrent threads to run.") { parseValue(String value)948 @Override public Object parseValue(String value) { 949 return Integer.parseInt(value); 950 } 951 }, reads(null, R)952 reads(null, 953 "Number of read operations to do. If negative, do --nums reads.") { 954 @Override public Object parseValue(String value) { 955 return Integer.parseInt(value); 956 } 957 }, 958 key_size(16, 959 "The size of each key in bytes.") { parseValue(String value)960 @Override public Object parseValue(String value) { 961 return Integer.parseInt(value); 962 } 963 }, 964 value_size(100, 965 "The size of each value in bytes.") { parseValue(String value)966 @Override public Object parseValue(String value) { 967 return Integer.parseInt(value); 968 } 969 }, 970 write_buffer_size(4L * SizeUnit.MB, 971 "Number of bytes to buffer in memtable before compacting\n" + 972 "\t(initialized to default value by 'main'.)") { parseValue(String value)973 @Override public Object parseValue(String value) { 974 return Long.parseLong(value); 975 } 976 }, 977 max_write_buffer_number(2, 978 "The number of in-memory memtables. Each memtable is of size\n" + 979 "\twrite_buffer_size.") { parseValue(String value)980 @Override public Object parseValue(String value) { 981 return Integer.parseInt(value); 982 } 983 }, 984 prefix_size(0, "Controls the prefix size for HashSkipList, HashLinkedList,\n" + 985 "\tand plain table.") { parseValue(String value)986 @Override public Object parseValue(String value) { 987 return Integer.parseInt(value); 988 } 989 }, 990 keys_per_prefix(0, "Controls the average number of keys generated\n" + 991 "\tper prefix, 0 means no special handling of the prefix,\n" + 992 "\ti.e. use the prefix comes with the generated random number.") { parseValue(String value)993 @Override public Object parseValue(String value) { 994 return Integer.parseInt(value); 995 } 996 }, 997 memtablerep("skip_list", 998 "The memtable format. Available options are\n" + 999 "\tskip_list,\n" + 1000 "\tvector,\n" + 1001 "\thash_linkedlist,\n" + 1002 "\thash_skiplist (prefix_hash.)") { parseValue(String value)1003 @Override public Object parseValue(String value) { 1004 return value; 1005 } 1006 }, hash_bucket_count(SizeUnit.MB, R + R + R)1007 hash_bucket_count(SizeUnit.MB, 1008 "The number of hash buckets used in the hash-bucket-based\n" + 1009 "\tmemtables. Memtables that currently support this argument are\n" + 1010 "\thash_linkedlist and hash_skiplist.") { 1011 @Override public Object parseValue(String value) { 1012 return Long.parseLong(value); 1013 } 1014 }, 1015 writes_per_second(10000, 1016 "The write-rate of the background writer used in the\n" + 1017 "\t`readwhilewriting` benchmark. Non-positive number indicates\n" + 1018 "\tusing an unbounded write-rate in `readwhilewriting` benchmark.") { parseValue(String value)1019 @Override public Object parseValue(String value) { 1020 return Integer.parseInt(value); 1021 } 1022 }, use_plain_table(false, R)1023 use_plain_table(false, 1024 "Use plain-table sst format.") { 1025 @Override public Object parseValue(String value) { 1026 return parseBoolean(value); 1027 } 1028 }, 1029 cache_size(-1L, 1030 "Number of bytes to use as a cache of uncompressed data.\n" + 1031 "\tNegative means use default settings.") { parseValue(String value)1032 @Override public Object parseValue(String value) { 1033 return Long.parseLong(value); 1034 } 1035 }, 1036 seed(0L, 1037 "Seed base for random number generators.") { parseValue(String value)1038 @Override public Object parseValue(String value) { 1039 return Long.parseLong(value); 1040 } 1041 }, 1042 num_levels(7, 1043 "The total number of levels.") { parseValue(String value)1044 @Override public Object parseValue(String value) { 1045 return Integer.parseInt(value); 1046 } 1047 }, 1048 numdistinct(1000L, 1049 "Number of distinct keys to use. Used in RandomWithVerify to\n" + 1050 "\tread/write on fewer keys so that gets are more likely to find the\n" + 1051 "\tkey and puts are more likely to update the same key.") { parseValue(String value)1052 @Override public Object parseValue(String value) { 1053 return Long.parseLong(value); 1054 } 1055 }, 1056 merge_keys(-1L, 1057 "Number of distinct keys to use for MergeRandom and\n" + 1058 "\tReadRandomMergeRandom.\n" + 1059 "\tIf negative, there will be FLAGS_num keys.") { parseValue(String value)1060 @Override public Object parseValue(String value) { 1061 return Long.parseLong(value); 1062 } 1063 }, 1064 bloom_locality(0,"Control bloom filter probes locality.") { parseValue(String value)1065 @Override public Object parseValue(String value) { 1066 return Integer.parseInt(value); 1067 } 1068 }, 1069 duration(0,"Time in seconds for the random-ops tests to run.\n" + 1070 "\tWhen 0 then num & reads determine the test duration.") { parseValue(String value)1071 @Override public Object parseValue(String value) { 1072 return Integer.parseInt(value); 1073 } 1074 }, 1075 num_multi_db(0, 1076 "Number of DBs used in the benchmark. 0 means single DB.") { parseValue(String value)1077 @Override public Object parseValue(String value) { 1078 return Integer.parseInt(value); 1079 } 1080 }, histogram(false,R)1081 histogram(false,"Print histogram of operation timings.") { 1082 @Override public Object parseValue(String value) { 1083 return parseBoolean(value); 1084 } 1085 }, 1086 min_write_buffer_number_to_merge( 1087 defaultOptions_.minWriteBufferNumberToMerge(), 1088 "The minimum number of write buffers that will be merged together\n" + 1089 "\tbefore writing to storage. This is cheap because it is an\n" + 1090 "\tin-memory merge. If this feature is not enabled, then all these\n" + 1091 "\twrite buffers are flushed to L0 as separate files and this\n" + 1092 "\tincreases read amplification because a get request has to check\n" + 1093 "\tin all of these files. Also, an in-memory merge may result in\n" + 1094 "\twriting less data to storage if there are duplicate records\n" + 1095 "\tin each of these individual write buffers.") { parseValue(String value)1096 @Override public Object parseValue(String value) { 1097 return Integer.parseInt(value); 1098 } 1099 }, 1100 max_background_compactions( 1101 defaultOptions_.maxBackgroundCompactions(), 1102 "The maximum number of concurrent background compactions\n" + 1103 "\tthat can occur in parallel.") { parseValue(String value)1104 @Override public Object parseValue(String value) { 1105 return Integer.parseInt(value); 1106 } 1107 }, 1108 max_background_flushes( 1109 defaultOptions_.maxBackgroundFlushes(), 1110 "The maximum number of concurrent background flushes\n" + 1111 "\tthat can occur in parallel.") { parseValue(String value)1112 @Override public Object parseValue(String value) { 1113 return Integer.parseInt(value); 1114 } 1115 }, 1116 max_background_jobs(defaultOptions_.maxBackgroundJobs(), 1117 "The maximum number of concurrent background jobs\n" 1118 + "\tthat can occur in parallel.") { 1119 @Override parseValue(String value)1120 public Object parseValue(String value) { 1121 return Integer.parseInt(value); 1122 } 1123 }, 1124 /* TODO(yhchiang): enable the following 1125 compaction_style((int32_t) defaultOptions_.compactionStyle(), 1126 "style of compaction: level-based vs universal.") { 1127 @Override public Object parseValue(String value) { 1128 return Integer.parseInt(value); 1129 } 1130 },*/ 1131 universal_size_ratio(0, 1132 "Percentage flexibility while comparing file size\n" + 1133 "\t(for universal compaction only).") { parseValue(String value)1134 @Override public Object parseValue(String value) { 1135 return Integer.parseInt(value); 1136 } 1137 }, 1138 universal_min_merge_width(0,"The minimum number of files in a\n" + 1139 "\tsingle compaction run (for universal compaction only).") { parseValue(String value)1140 @Override public Object parseValue(String value) { 1141 return Integer.parseInt(value); 1142 } 1143 }, 1144 universal_max_merge_width(0,"The max number of files to compact\n" + 1145 "\tin universal style compaction.") { parseValue(String value)1146 @Override public Object parseValue(String value) { 1147 return Integer.parseInt(value); 1148 } 1149 }, 1150 universal_max_size_amplification_percent(0, 1151 "The max size amplification for universal style compaction.") { parseValue(String value)1152 @Override public Object parseValue(String value) { 1153 return Integer.parseInt(value); 1154 } 1155 }, 1156 universal_compression_size_percent(-1, 1157 "The percentage of the database to compress for universal\n" + 1158 "\tcompaction. -1 means compress everything.") { parseValue(String value)1159 @Override public Object parseValue(String value) { 1160 return Integer.parseInt(value); 1161 } 1162 }, 1163 block_size(defaultBlockBasedTableOptions_.blockSize(), 1164 "Number of bytes in a block.") { parseValue(String value)1165 @Override public Object parseValue(String value) { 1166 return Long.parseLong(value); 1167 } 1168 }, 1169 compressed_cache_size(-1L, 1170 "Number of bytes to use as a cache of compressed data.") { parseValue(String value)1171 @Override public Object parseValue(String value) { 1172 return Long.parseLong(value); 1173 } 1174 }, 1175 open_files(defaultOptions_.maxOpenFiles(), 1176 "Maximum number of files to keep open at the same time\n" + 1177 "\t(use default if == 0)") { parseValue(String value)1178 @Override public Object parseValue(String value) { 1179 return Integer.parseInt(value); 1180 } 1181 }, 1182 bloom_bits(-1,"Bloom filter bits per key. Negative means\n" + 1183 "\tuse default settings.") { parseValue(String value)1184 @Override public Object parseValue(String value) { 1185 return Integer.parseInt(value); 1186 } 1187 }, 1188 memtable_bloom_size_ratio(0.0d, "Ratio of memtable used by the bloom filter.\n" 1189 + "\t0 means no bloom filter.") { parseValue(String value)1190 @Override public Object parseValue(String value) { 1191 return Double.parseDouble(value); 1192 } 1193 }, 1194 cache_numshardbits(-1,"Number of shards for the block cache\n" + 1195 "\tis 2 ** cache_numshardbits. Negative means use default settings.\n" + 1196 "\tThis is applied only if FLAGS_cache_size is non-negative.") { parseValue(String value)1197 @Override public Object parseValue(String value) { 1198 return Integer.parseInt(value); 1199 } 1200 }, verify_checksum(false,R + R)1201 verify_checksum(false,"Verify checksum for every block read\n" + 1202 "\tfrom storage.") { 1203 @Override public Object parseValue(String value) { 1204 return parseBoolean(value); 1205 } 1206 }, statistics(false,R)1207 statistics(false,"Database statistics.") { 1208 @Override public Object parseValue(String value) { 1209 return parseBoolean(value); 1210 } 1211 }, 1212 writes(-1L, "Number of write operations to do. If negative, do\n" + 1213 "\t--num reads.") { parseValue(String value)1214 @Override public Object parseValue(String value) { 1215 return Long.parseLong(value); 1216 } 1217 }, sync(false,R)1218 sync(false,"Sync all writes to disk.") { 1219 @Override public Object parseValue(String value) { 1220 return parseBoolean(value); 1221 } 1222 }, use_fsync(false,R)1223 use_fsync(false,"If true, issue fsync instead of fdatasync.") { 1224 @Override public Object parseValue(String value) { 1225 return parseBoolean(value); 1226 } 1227 }, disable_wal(false,R)1228 disable_wal(false,"If true, do not write WAL for write.") { 1229 @Override public Object parseValue(String value) { 1230 return parseBoolean(value); 1231 } 1232 }, 1233 wal_dir("", "If not empty, use the given dir for WAL.") { parseValue(String value)1234 @Override public Object parseValue(String value) { 1235 return value; 1236 } 1237 }, 1238 target_file_size_base(2 * 1048576,"Target file size at level-1") { parseValue(String value)1239 @Override public Object parseValue(String value) { 1240 return Integer.parseInt(value); 1241 } 1242 }, 1243 target_file_size_multiplier(1, 1244 "A multiplier to compute target level-N file size (N >= 2)") { parseValue(String value)1245 @Override public Object parseValue(String value) { 1246 return Integer.parseInt(value); 1247 } 1248 }, 1249 max_bytes_for_level_base(10 * 1048576, 1250 "Max bytes for level-1") { parseValue(String value)1251 @Override public Object parseValue(String value) { 1252 return Integer.parseInt(value); 1253 } 1254 }, 1255 max_bytes_for_level_multiplier(10.0d, 1256 "A multiplier to compute max bytes for level-N (N >= 2)") { parseValue(String value)1257 @Override public Object parseValue(String value) { 1258 return Double.parseDouble(value); 1259 } 1260 }, 1261 level0_stop_writes_trigger(12,"Number of files in level-0\n" + 1262 "\tthat will trigger put stop.") { parseValue(String value)1263 @Override public Object parseValue(String value) { 1264 return Integer.parseInt(value); 1265 } 1266 }, 1267 level0_slowdown_writes_trigger(8,"Number of files in level-0\n" + 1268 "\tthat will slow down writes.") { parseValue(String value)1269 @Override public Object parseValue(String value) { 1270 return Integer.parseInt(value); 1271 } 1272 }, 1273 level0_file_num_compaction_trigger(4,"Number of files in level-0\n" + 1274 "\twhen compactions start.") { parseValue(String value)1275 @Override public Object parseValue(String value) { 1276 return Integer.parseInt(value); 1277 } 1278 }, 1279 readwritepercent(90,"Ratio of reads to reads/writes (expressed\n" + 1280 "\tas percentage) for the ReadRandomWriteRandom workload. The\n" + 1281 "\tdefault value 90 means 90% operations out of all reads and writes\n" + 1282 "\toperations are reads. In other words, 9 gets for every 1 put.") { parseValue(String value)1283 @Override public Object parseValue(String value) { 1284 return Integer.parseInt(value); 1285 } 1286 }, 1287 mergereadpercent(70,"Ratio of merges to merges&reads (expressed\n" + 1288 "\tas percentage) for the ReadRandomMergeRandom workload. The\n" + 1289 "\tdefault value 70 means 70% out of all read and merge operations\n" + 1290 "\tare merges. In other words, 7 merges for every 3 gets.") { parseValue(String value)1291 @Override public Object parseValue(String value) { 1292 return Integer.parseInt(value); 1293 } 1294 }, 1295 deletepercent(2,"Percentage of deletes out of reads/writes/\n" + 1296 "\tdeletes (used in RandomWithVerify only). RandomWithVerify\n" + 1297 "\tcalculates writepercent as (100 - FLAGS_readwritepercent -\n" + 1298 "\tdeletepercent), so deletepercent must be smaller than (100 -\n" + 1299 "\tFLAGS_readwritepercent)") { parseValue(String value)1300 @Override public Object parseValue(String value) { 1301 return Integer.parseInt(value); 1302 } 1303 }, 1304 delete_obsolete_files_period_micros(0,"Option to delete\n" + 1305 "\tobsolete files periodically. 0 means that obsolete files are\n" + 1306 "\tdeleted after every compaction run.") { parseValue(String value)1307 @Override public Object parseValue(String value) { 1308 return Integer.parseInt(value); 1309 } 1310 }, 1311 compression_type("snappy", 1312 "Algorithm used to compress the database.") { parseValue(String value)1313 @Override public Object parseValue(String value) { 1314 return value; 1315 } 1316 }, 1317 compression_level(-1, 1318 "Compression level. For zlib this should be -1 for the\n" + 1319 "\tdefault level, or between 0 and 9.") { parseValue(String value)1320 @Override public Object parseValue(String value) { 1321 return Integer.parseInt(value); 1322 } 1323 }, 1324 min_level_to_compress(-1,"If non-negative, compression starts\n" + 1325 "\tfrom this level. Levels with number < min_level_to_compress are\n" + 1326 "\tnot compressed. Otherwise, apply compression_type to\n" + 1327 "\tall levels.") { parseValue(String value)1328 @Override public Object parseValue(String value) { 1329 return Integer.parseInt(value); 1330 } 1331 }, 1332 table_cache_numshardbits(4,"") { parseValue(String value)1333 @Override public Object parseValue(String value) { 1334 return Integer.parseInt(value); 1335 } 1336 }, 1337 stats_interval(0L, "Stats are reported every N operations when\n" + 1338 "\tthis is greater than zero. When 0 the interval grows over time.") { parseValue(String value)1339 @Override public Object parseValue(String value) { 1340 return Long.parseLong(value); 1341 } 1342 }, 1343 stats_per_interval(0,"Reports additional stats per interval when\n" + 1344 "\tthis is greater than 0.") { parseValue(String value)1345 @Override public Object parseValue(String value) { 1346 return Integer.parseInt(value); 1347 } 1348 }, 1349 perf_level(0,"Level of perf collection.") { parseValue(String value)1350 @Override public Object parseValue(String value) { 1351 return Integer.parseInt(value); 1352 } 1353 }, 1354 soft_rate_limit(0.0d,"") { parseValue(String value)1355 @Override public Object parseValue(String value) { 1356 return Double.parseDouble(value); 1357 } 1358 }, 1359 hard_rate_limit(0.0d,"When not equal to 0 this make threads\n" + 1360 "\tsleep at each stats reporting interval until the compaction\n" + 1361 "\tscore for all levels is less than or equal to this value.") { parseValue(String value)1362 @Override public Object parseValue(String value) { 1363 return Double.parseDouble(value); 1364 } 1365 }, 1366 rate_limit_delay_max_milliseconds(1000, 1367 "When hard_rate_limit is set then this is the max time a put will\n" + 1368 "\tbe stalled.") { parseValue(String value)1369 @Override public Object parseValue(String value) { 1370 return Integer.parseInt(value); 1371 } 1372 }, 1373 max_compaction_bytes(0L, "Limit number of bytes in one compaction to be lower than this\n" + 1374 "\threshold. But it's not guaranteed.") { parseValue(String value)1375 @Override public Object parseValue(String value) { 1376 return Long.parseLong(value); 1377 } 1378 }, readonly(false,R)1379 readonly(false,"Run read only benchmarks.") { 1380 @Override public Object parseValue(String value) { 1381 return parseBoolean(value); 1382 } 1383 }, disable_auto_compactions(false,R)1384 disable_auto_compactions(false,"Do not auto trigger compactions.") { 1385 @Override public Object parseValue(String value) { 1386 return parseBoolean(value); 1387 } 1388 }, 1389 wal_ttl_seconds(0L,"Set the TTL for the WAL Files in seconds.") { parseValue(String value)1390 @Override public Object parseValue(String value) { 1391 return Long.parseLong(value); 1392 } 1393 }, 1394 wal_size_limit_MB(0L,"Set the size limit for the WAL Files\n" + 1395 "\tin MB.") { parseValue(String value)1396 @Override public Object parseValue(String value) { 1397 return Long.parseLong(value); 1398 } 1399 }, 1400 /* TODO(yhchiang): enable the following 1401 direct_reads(rocksdb::EnvOptions().use_direct_reads, 1402 "Allow direct I/O reads.") { 1403 @Override public Object parseValue(String value) { 1404 return parseBoolean(value); 1405 } 1406 }, 1407 direct_writes(rocksdb::EnvOptions().use_direct_reads, 1408 "Allow direct I/O reads.") { 1409 @Override public Object parseValue(String value) { 1410 return parseBoolean(value); 1411 } 1412 }, 1413 */ mmap_read(false, R)1414 mmap_read(false, 1415 "Allow reads to occur via mmap-ing files.") { 1416 @Override public Object parseValue(String value) { 1417 return parseBoolean(value); 1418 } 1419 }, mmap_write(false, R)1420 mmap_write(false, 1421 "Allow writes to occur via mmap-ing files.") { 1422 @Override public Object parseValue(String value) { 1423 return parseBoolean(value); 1424 } 1425 }, 1426 advise_random_on_open(defaultOptions_.adviseRandomOnOpen(), 1427 "Advise random access on table file open.") { parseValue(String value)1428 @Override public Object parseValue(String value) { 1429 return parseBoolean(value); 1430 } 1431 }, 1432 compaction_fadvice("NORMAL", 1433 "Access pattern advice when a file is compacted.") { parseValue(String value)1434 @Override public Object parseValue(String value) { 1435 return value; 1436 } 1437 }, use_tailing_iterator(false, R)1438 use_tailing_iterator(false, 1439 "Use tailing iterator to access a series of keys instead of get.") { 1440 @Override public Object parseValue(String value) { 1441 return parseBoolean(value); 1442 } 1443 }, 1444 use_adaptive_mutex(defaultOptions_.useAdaptiveMutex(), 1445 "Use adaptive mutex.") { parseValue(String value)1446 @Override public Object parseValue(String value) { 1447 return parseBoolean(value); 1448 } 1449 }, 1450 bytes_per_sync(defaultOptions_.bytesPerSync(), 1451 "Allows OS to incrementally sync files to disk while they are\n" + 1452 "\tbeing written, in the background. Issue one request for every\n" + 1453 "\tbytes_per_sync written. 0 turns it off.") { parseValue(String value)1454 @Override public Object parseValue(String value) { 1455 return Long.parseLong(value); 1456 } 1457 }, filter_deletes(false,R + R)1458 filter_deletes(false," On true, deletes use bloom-filter and drop\n" + 1459 "\tthe delete if key not present.") { 1460 @Override public Object parseValue(String value) { 1461 return parseBoolean(value); 1462 } 1463 }, 1464 max_successive_merges(0,"Maximum number of successive merge\n" + 1465 "\toperations on a key in the memtable.") { parseValue(String value)1466 @Override public Object parseValue(String value) { 1467 return Integer.parseInt(value); 1468 } 1469 }, 1470 db(getTempDir("rocksdb-jni"), 1471 "Use the db with the following name.") { parseValue(String value)1472 @Override public Object parseValue(String value) { 1473 return value; 1474 } 1475 }, use_mem_env(false, R + R)1476 use_mem_env(false, "Use RocksMemEnv instead of default filesystem based\n" + 1477 "environment.") { 1478 @Override public Object parseValue(String value) { 1479 return parseBoolean(value); 1480 } 1481 }, java_comparator(null, R + R + R)1482 java_comparator(null, "Class name of a Java Comparator to use instead\n" + 1483 "\tof the default C++ ByteWiseComparatorImpl. Must be available on\n" + 1484 "\tthe classpath") { 1485 @Override 1486 protected Object parseValue(final String value) { 1487 try { 1488 final ComparatorOptions copt = new ComparatorOptions(); 1489 final Class<AbstractComparator> clsComparator = 1490 (Class<AbstractComparator>)Class.forName(value); 1491 final Constructor cstr = 1492 clsComparator.getConstructor(ComparatorOptions.class); 1493 return cstr.newInstance(copt); 1494 } catch(final ClassNotFoundException cnfe) { 1495 throw new IllegalArgumentException("Java Comparator '" + value + "'" + 1496 " not found on the classpath", cnfe); 1497 } catch(final NoSuchMethodException nsme) { 1498 throw new IllegalArgumentException("Java Comparator '" + value + "'" + 1499 " does not have a public ComparatorOptions constructor", nsme); 1500 } catch(final IllegalAccessException | InstantiationException 1501 | InvocationTargetException ie) { 1502 throw new IllegalArgumentException("Unable to construct Java" + 1503 " Comparator '" + value + "'", ie); 1504 } 1505 } 1506 }; 1507 Flag(Object defaultValue, String desc)1508 private Flag(Object defaultValue, String desc) { 1509 defaultValue_ = defaultValue; 1510 desc_ = desc; 1511 } 1512 getDefaultValue()1513 public Object getDefaultValue() { 1514 return defaultValue_; 1515 } 1516 desc()1517 public String desc() { 1518 return desc_; 1519 } 1520 parseBoolean(String value)1521 public boolean parseBoolean(String value) { 1522 if (value.equals("1")) { 1523 return true; 1524 } else if (value.equals("0")) { 1525 return false; 1526 } 1527 return Boolean.parseBoolean(value); 1528 } 1529 parseValue(String value)1530 protected abstract Object parseValue(String value); 1531 1532 private final Object defaultValue_; 1533 private final String desc_; 1534 } 1535 1536 private final static String DEFAULT_TEMP_DIR = "/tmp"; 1537 getTempDir(final String dirName)1538 private static String getTempDir(final String dirName) { 1539 try { 1540 return Files.createTempDirectory(dirName).toAbsolutePath().toString(); 1541 } catch(final IOException ioe) { 1542 System.err.println("Unable to create temp directory, defaulting to: " + 1543 DEFAULT_TEMP_DIR); 1544 return DEFAULT_TEMP_DIR + File.pathSeparator + dirName; 1545 } 1546 } 1547 1548 private static class RandomGenerator { 1549 private final byte[] data_; 1550 private int dataLength_; 1551 private int position_; 1552 private double compressionRatio_; 1553 Random rand_; 1554 RandomGenerator(long seed, double compressionRatio)1555 private RandomGenerator(long seed, double compressionRatio) { 1556 // We use a limited amount of data over and over again and ensure 1557 // that it is larger than the compression window (32KB), and also 1558 byte[] value = new byte[100]; 1559 // large enough to serve all typical value sizes we want to write. 1560 rand_ = new Random(seed); 1561 dataLength_ = value.length * 10000; 1562 data_ = new byte[dataLength_]; 1563 compressionRatio_ = compressionRatio; 1564 int pos = 0; 1565 while (pos < dataLength_) { 1566 compressibleBytes(value); 1567 System.arraycopy(value, 0, data_, pos, 1568 Math.min(value.length, dataLength_ - pos)); 1569 pos += value.length; 1570 } 1571 } 1572 compressibleBytes(byte[] value)1573 private void compressibleBytes(byte[] value) { 1574 int baseLength = value.length; 1575 if (compressionRatio_ < 1.0d) { 1576 baseLength = (int) (compressionRatio_ * value.length + 0.5); 1577 } 1578 if (baseLength <= 0) { 1579 baseLength = 1; 1580 } 1581 int pos; 1582 for (pos = 0; pos < baseLength; ++pos) { 1583 value[pos] = (byte) (' ' + rand_.nextInt(95)); // ' ' .. '~' 1584 } 1585 while (pos < value.length) { 1586 System.arraycopy(value, 0, value, pos, 1587 Math.min(baseLength, value.length - pos)); 1588 pos += baseLength; 1589 } 1590 } 1591 generate(byte[] value)1592 private void generate(byte[] value) { 1593 if (position_ + value.length > data_.length) { 1594 position_ = 0; 1595 assert(value.length <= data_.length); 1596 } 1597 position_ += value.length; 1598 System.arraycopy(data_, position_ - value.length, 1599 value, 0, value.length); 1600 } 1601 } 1602 isFinished()1603 boolean isFinished() { 1604 synchronized(finishLock_) { 1605 return isFinished_; 1606 } 1607 } 1608 setFinished(boolean flag)1609 void setFinished(boolean flag) { 1610 synchronized(finishLock_) { 1611 isFinished_ = flag; 1612 } 1613 } 1614 1615 RocksDB db_; 1616 final List<String> benchmarks_; 1617 final int num_; 1618 final int reads_; 1619 final int keySize_; 1620 final int valueSize_; 1621 final int threadNum_; 1622 final int writesPerSeconds_; 1623 final long randSeed_; 1624 final boolean useExisting_; 1625 final String databaseDir_; 1626 double compressionRatio_; 1627 RandomGenerator gen_; 1628 long startTime_; 1629 1630 // env 1631 boolean useMemenv_; 1632 1633 // memtable related 1634 final int maxWriteBufferNumber_; 1635 final int prefixSize_; 1636 final int keysPerPrefix_; 1637 final String memtable_; 1638 final long hashBucketCount_; 1639 1640 // sst format related 1641 boolean usePlainTable_; 1642 1643 Object finishLock_; 1644 boolean isFinished_; 1645 Map<Flag, Object> flags_; 1646 // as the scope of a static member equals to the scope of the problem, 1647 // we let its c++ pointer to be disposed in its finalizer. 1648 static Options defaultOptions_ = new Options(); 1649 static BlockBasedTableConfig defaultBlockBasedTableOptions_ = 1650 new BlockBasedTableConfig(); 1651 String compressionType_; 1652 CompressionType compression_; 1653 } 1654