1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifdef GFLAGS
11 #ifdef NUMA
12 #include <numa.h>
13 #include <numaif.h>
14 #endif
15 #ifndef OS_WIN
16 #include <unistd.h>
17 #endif
18 #include <fcntl.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <atomic>
23 #include <cinttypes>
24 #include <condition_variable>
25 #include <cstddef>
26 #include <memory>
27 #include <mutex>
28 #include <thread>
29 #include <unordered_map>
30
31 #include "db/db_impl/db_impl.h"
32 #include "db/malloc_stats.h"
33 #include "db/version_set.h"
34 #include "hdfs/env_hdfs.h"
35 #include "monitoring/histogram.h"
36 #include "monitoring/statistics.h"
37 #include "options/cf_options.h"
38 #include "port/port.h"
39 #include "port/stack_trace.h"
40 #include "rocksdb/cache.h"
41 #include "rocksdb/db.h"
42 #include "rocksdb/env.h"
43 #include "rocksdb/filter_policy.h"
44 #include "rocksdb/memtablerep.h"
45 #include "rocksdb/options.h"
46 #include "rocksdb/perf_context.h"
47 #include "rocksdb/persistent_cache.h"
48 #include "rocksdb/rate_limiter.h"
49 #include "rocksdb/slice.h"
50 #include "rocksdb/slice_transform.h"
51 #include "rocksdb/stats_history.h"
52 #include "rocksdb/utilities/object_registry.h"
53 #include "rocksdb/utilities/optimistic_transaction_db.h"
54 #include "rocksdb/utilities/options_util.h"
55 #include "rocksdb/utilities/sim_cache.h"
56 #include "rocksdb/utilities/transaction.h"
57 #include "rocksdb/utilities/transaction_db.h"
58 #include "rocksdb/write_batch.h"
59 #include "test_util/testutil.h"
60 #include "test_util/transaction_test_util.h"
61 #include "util/cast_util.h"
62 #include "util/compression.h"
63 #include "util/crc32c.h"
64 #include "util/gflags_compat.h"
65 #include "util/mutexlock.h"
66 #include "util/random.h"
67 #include "util/stderr_logger.h"
68 #include "util/string_util.h"
69 #include "util/xxhash.h"
70 #include "utilities/blob_db/blob_db.h"
71 #include "utilities/merge_operators.h"
72 #include "utilities/merge_operators/bytesxor.h"
73 #include "utilities/merge_operators/sortlist.h"
74 #include "utilities/persistent_cache/block_cache_tier.h"
75
76 #ifdef OS_WIN
77 #include <io.h> // open/close
78 #endif
79
80 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
81 using GFLAGS_NAMESPACE::RegisterFlagValidator;
82 using GFLAGS_NAMESPACE::SetUsageMessage;
83
84 DEFINE_string(
85 benchmarks,
86 "fillseq,"
87 "fillseqdeterministic,"
88 "fillsync,"
89 "fillrandom,"
90 "filluniquerandomdeterministic,"
91 "overwrite,"
92 "readrandom,"
93 "newiterator,"
94 "newiteratorwhilewriting,"
95 "seekrandom,"
96 "seekrandomwhilewriting,"
97 "seekrandomwhilemerging,"
98 "readseq,"
99 "readreverse,"
100 "compact,"
101 "compactall,"
102 "multireadrandom,"
103 "mixgraph,"
104 "readseq,"
105 "readtorowcache,"
106 "readtocache,"
107 "readreverse,"
108 "readwhilewriting,"
109 "readwhilemerging,"
110 "readwhilescanning,"
111 "readrandomwriterandom,"
112 "updaterandom,"
113 "xorupdaterandom,"
114 "randomwithverify,"
115 "fill100K,"
116 "crc32c,"
117 "xxhash,"
118 "compress,"
119 "uncompress,"
120 "acquireload,"
121 "fillseekseq,"
122 "randomtransaction,"
123 "randomreplacekeys,"
124 "timeseries,"
125 "getmergeoperands",
126
127 "Comma-separated list of operations to run in the specified"
128 " order. Available benchmarks:\n"
129 "\tfillseq -- write N values in sequential key"
130 " order in async mode\n"
131 "\tfillseqdeterministic -- write N values in the specified"
132 " key order and keep the shape of the LSM tree\n"
133 "\tfillrandom -- write N values in random key order in async"
134 " mode\n"
135 "\tfilluniquerandomdeterministic -- write N values in a random"
136 " key order and keep the shape of the LSM tree\n"
137 "\toverwrite -- overwrite N values in random key order in"
138 " async mode\n"
139 "\tfillsync -- write N/1000 values in random key order in "
140 "sync mode\n"
141 "\tfill100K -- write N/1000 100K values in random order in"
142 " async mode\n"
143 "\tdeleteseq -- delete N keys in sequential order\n"
144 "\tdeleterandom -- delete N keys in random order\n"
145 "\treadseq -- read N times sequentially\n"
146 "\treadtocache -- 1 thread reading database sequentially\n"
147 "\treadreverse -- read N times in reverse order\n"
148 "\treadrandom -- read N times in random order\n"
149 "\treadmissing -- read N missing keys in random order\n"
150 "\treadwhilewriting -- 1 writer, N threads doing random "
151 "reads\n"
152 "\treadwhilemerging -- 1 merger, N threads doing random "
153 "reads\n"
154 "\treadwhilescanning -- 1 thread doing full table scan, "
155 "N threads doing random reads\n"
156 "\treadrandomwriterandom -- N threads doing random-read, "
157 "random-write\n"
158 "\tupdaterandom -- N threads doing read-modify-write for random "
159 "keys\n"
160 "\txorupdaterandom -- N threads doing read-XOR-write for "
161 "random keys\n"
162 "\tappendrandom -- N threads doing read-modify-write with "
163 "growing values\n"
164 "\tmergerandom -- same as updaterandom/appendrandom using merge"
165 " operator. "
166 "Must be used with merge_operator\n"
167 "\treadrandommergerandom -- perform N random read-or-merge "
168 "operations. Must be used with merge_operator\n"
169 "\tnewiterator -- repeated iterator creation\n"
170 "\tseekrandom -- N random seeks, call Next seek_nexts times "
171 "per seek\n"
172 "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
173 "overwrite\n"
174 "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
175 "merge\n"
176 "\tcrc32c -- repeated crc32c of 4K of data\n"
177 "\txxhash -- repeated xxHash of 4K of data\n"
178 "\tacquireload -- load N*1000 times\n"
179 "\tfillseekseq -- write N values in sequential key, then read "
180 "them by seeking to each key\n"
181 "\trandomtransaction -- execute N random transactions and "
182 "verify correctness\n"
183 "\trandomreplacekeys -- randomly replaces N keys by deleting "
184 "the old version and putting the new version\n\n"
185 "\ttimeseries -- 1 writer generates time series data "
186 "and multiple readers doing random reads on id\n\n"
187 "Meta operations:\n"
188 "\tcompact -- Compact the entire DB; If multiple, randomly choose one\n"
189 "\tcompactall -- Compact the entire DB\n"
190 "\tstats -- Print DB stats\n"
191 "\tresetstats -- Reset DB stats\n"
192 "\tlevelstats -- Print the number of files and bytes per level\n"
193 "\tsstables -- Print sstable info\n"
194 "\theapprofile -- Dump a heap profile (if supported by this port)\n"
195 "\treplay -- replay the trace file specified with trace_file\n"
196 "\tgetmergeoperands -- Insert lots of merge records which are a list of "
197 "sorted ints for a key and then compare performance of lookup for another "
198 "key "
199 "by doing a Get followed by binary searching in the large sorted list vs "
200 "doing a GetMergeOperands and binary searching in the operands which are"
201 "sorted sub-lists. The MergeOperator used is sortlist.h\n");
202
203 DEFINE_int64(num, 1000000, "Number of key/values to place in database");
204
205 DEFINE_int64(numdistinct, 1000,
206 "Number of distinct keys to use. Used in RandomWithVerify to "
207 "read/write on fewer keys so that gets are more likely to find the"
208 " key and puts are more likely to update the same key");
209
210 DEFINE_int64(merge_keys, -1,
211 "Number of distinct keys to use for MergeRandom and "
212 "ReadRandomMergeRandom. "
213 "If negative, there will be FLAGS_num keys.");
214 DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
215
216 DEFINE_int32(
217 num_hot_column_families, 0,
218 "Number of Hot Column Families. If more than 0, only write to this "
219 "number of column families. After finishing all the writes to them, "
220 "create new set of column families and insert to them. Only used "
221 "when num_column_families > 1.");
222
223 DEFINE_string(column_family_distribution, "",
224 "Comma-separated list of percentages, where the ith element "
225 "indicates the probability of an op using the ith column family. "
226 "The number of elements must be `num_hot_column_families` if "
227 "specified; otherwise, it must be `num_column_families`. The "
228 "sum of elements must be 100. E.g., if `num_column_families=4`, "
229 "and `num_hot_column_families=0`, a valid list could be "
230 "\"10,20,30,40\".");
231
232 DEFINE_int64(reads, -1, "Number of read operations to do. "
233 "If negative, do FLAGS_num reads.");
234
235 DEFINE_int64(deletes, -1, "Number of delete operations to do. "
236 "If negative, do FLAGS_num deletions.");
237
238 DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");
239
240 DEFINE_int64(seed, 0, "Seed base for random number generators. "
241 "When 0 it is deterministic.");
242
243 DEFINE_int32(threads, 1, "Number of concurrent threads to run.");
244
245 DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
246 " When 0 then num & reads determine the test duration");
247
248 DEFINE_string(value_size_distribution_type, "fixed",
249 "Value size distribution type: fixed, uniform, normal");
250
251 DEFINE_int32(value_size, 100, "Size of each value in fixed distribution");
252 static unsigned int value_size = 100;
253
254 DEFINE_int32(value_size_min, 100, "Min size of random value");
255
256 DEFINE_int32(value_size_max, 102400, "Max size of random value");
257
258 DEFINE_int32(seek_nexts, 0,
259 "How many times to call Next() after Seek() in "
260 "fillseekseq, seekrandom, seekrandomwhilewriting and "
261 "seekrandomwhilemerging");
262
263 DEFINE_bool(reverse_iterator, false,
264 "When true use Prev rather than Next for iterators that do "
265 "Seek and then Next");
266
267 DEFINE_int64(max_scan_distance, 0,
268 "Used to define iterate_upper_bound (or iterate_lower_bound "
269 "if FLAGS_reverse_iterator is set to true) when value is nonzero");
270
271 DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
272
273 DEFINE_int64(batch_size, 1, "Batch size");
274
ValidateKeySize(const char *,int32_t)275 static bool ValidateKeySize(const char* /*flagname*/, int32_t /*value*/) {
276 return true;
277 }
278
ValidateUint32Range(const char * flagname,uint64_t value)279 static bool ValidateUint32Range(const char* flagname, uint64_t value) {
280 if (value > std::numeric_limits<uint32_t>::max()) {
281 fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
282 (unsigned long)value);
283 return false;
284 }
285 return true;
286 }
287
288 DEFINE_int32(key_size, 16, "size of each key");
289
290 DEFINE_int32(num_multi_db, 0,
291 "Number of DBs used in the benchmark. 0 means single DB.");
292
293 DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
294 " to this fraction of their original size after compression");
295
296 DEFINE_double(read_random_exp_range, 0.0,
297 "Read random's key will be generated using distribution of "
298 "num * exp(-r) where r is uniform number from 0 to this value. "
299 "The larger the number is, the more skewed the reads are. "
300 "Only used in readrandom and multireadrandom benchmarks.");
301
302 DEFINE_bool(histogram, false, "Print histogram of operation timings");
303
304 DEFINE_bool(enable_numa, false,
305 "Make operations aware of NUMA architecture and bind memory "
306 "and cpus corresponding to nodes together. In NUMA, memory "
307 "in same node as CPUs are closer when compared to memory in "
308 "other nodes. Reads can be faster when the process is bound to "
309 "CPU and memory of same node. Use \"$numactl --hardware\" command "
310 "to see NUMA memory architecture.");
311
312 DEFINE_int64(db_write_buffer_size,
313 ROCKSDB_NAMESPACE::Options().db_write_buffer_size,
314 "Number of bytes to buffer in all memtables before compacting");
315
316 DEFINE_bool(cost_write_buffer_to_cache, false,
317 "The usage of memtable is costed to the block cache");
318
319 DEFINE_int64(write_buffer_size, ROCKSDB_NAMESPACE::Options().write_buffer_size,
320 "Number of bytes to buffer in memtable before compacting");
321
322 DEFINE_int32(max_write_buffer_number,
323 ROCKSDB_NAMESPACE::Options().max_write_buffer_number,
324 "The number of in-memory memtables. Each memtable is of size"
325 " write_buffer_size bytes.");
326
327 DEFINE_int32(min_write_buffer_number_to_merge,
328 ROCKSDB_NAMESPACE::Options().min_write_buffer_number_to_merge,
329 "The minimum number of write buffers that will be merged together"
330 "before writing to storage. This is cheap because it is an"
331 "in-memory merge. If this feature is not enabled, then all these"
332 "write buffers are flushed to L0 as separate files and this "
333 "increases read amplification because a get request has to check"
334 " in all of these files. Also, an in-memory merge may result in"
335 " writing less data to storage if there are duplicate records "
336 " in each of these individual write buffers.");
337
338 DEFINE_int32(max_write_buffer_number_to_maintain,
339 ROCKSDB_NAMESPACE::Options().max_write_buffer_number_to_maintain,
340 "The total maximum number of write buffers to maintain in memory "
341 "including copies of buffers that have already been flushed. "
342 "Unlike max_write_buffer_number, this parameter does not affect "
343 "flushing. This controls the minimum amount of write history "
344 "that will be available in memory for conflict checking when "
345 "Transactions are used. If this value is too low, some "
346 "transactions may fail at commit time due to not being able to "
347 "determine whether there were any write conflicts. Setting this "
348 "value to 0 will cause write buffers to be freed immediately "
349 "after they are flushed. If this value is set to -1, "
350 "'max_write_buffer_number' will be used.");
351
352 DEFINE_int64(max_write_buffer_size_to_maintain,
353 ROCKSDB_NAMESPACE::Options().max_write_buffer_size_to_maintain,
354 "The total maximum size of write buffers to maintain in memory "
355 "including copies of buffers that have already been flushed. "
356 "Unlike max_write_buffer_number, this parameter does not affect "
357 "flushing. This controls the minimum amount of write history "
358 "that will be available in memory for conflict checking when "
359 "Transactions are used. If this value is too low, some "
360 "transactions may fail at commit time due to not being able to "
361 "determine whether there were any write conflicts. Setting this "
362 "value to 0 will cause write buffers to be freed immediately "
363 "after they are flushed. If this value is set to -1, "
364 "'max_write_buffer_number' will be used.");
365
366 DEFINE_int32(max_background_jobs,
367 ROCKSDB_NAMESPACE::Options().max_background_jobs,
368 "The maximum number of concurrent background jobs that can occur "
369 "in parallel.");
370
371 DEFINE_int32(num_bottom_pri_threads, 0,
372 "The number of threads in the bottom-priority thread pool (used "
373 "by universal compaction only).");
374
375 DEFINE_int32(num_high_pri_threads, 0,
376 "The maximum number of concurrent background compactions"
377 " that can occur in parallel.");
378
379 DEFINE_int32(num_low_pri_threads, 0,
380 "The maximum number of concurrent background compactions"
381 " that can occur in parallel.");
382
383 DEFINE_int32(max_background_compactions,
384 ROCKSDB_NAMESPACE::Options().max_background_compactions,
385 "The maximum number of concurrent background compactions"
386 " that can occur in parallel.");
387
388 DEFINE_int32(base_background_compactions, -1, "DEPRECATED");
389
390 DEFINE_uint64(subcompactions, 1,
391 "Maximum number of subcompactions to divide L0-L1 compactions "
392 "into.");
393 static const bool FLAGS_subcompactions_dummy
394 __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions,
395 &ValidateUint32Range);
396
397 DEFINE_int32(max_background_flushes,
398 ROCKSDB_NAMESPACE::Options().max_background_flushes,
399 "The maximum number of concurrent background flushes"
400 " that can occur in parallel.");
401
402 static ROCKSDB_NAMESPACE::CompactionStyle FLAGS_compaction_style_e;
403 DEFINE_int32(compaction_style,
404 (int32_t)ROCKSDB_NAMESPACE::Options().compaction_style,
405 "style of compaction: level-based, universal and fifo");
406
407 static ROCKSDB_NAMESPACE::CompactionPri FLAGS_compaction_pri_e;
408 DEFINE_int32(compaction_pri,
409 (int32_t)ROCKSDB_NAMESPACE::Options().compaction_pri,
410 "priority of files to compaction: by size or by data age");
411
412 DEFINE_int32(universal_size_ratio, 0,
413 "Percentage flexibility while comparing file size"
414 " (for universal compaction only).");
415
416 DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
417 " single compaction run (for universal compaction only).");
418
419 DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
420 " in universal style compaction");
421
422 DEFINE_int32(universal_max_size_amplification_percent, 0,
423 "The max size amplification for universal style compaction");
424
425 DEFINE_int32(universal_compression_size_percent, -1,
426 "The percentage of the database to compress for universal "
427 "compaction. -1 means compress everything.");
428
429 DEFINE_bool(universal_allow_trivial_move, false,
430 "Allow trivial move in universal compaction.");
431
432 DEFINE_int64(cache_size, 8 << 20, // 8MB
433 "Number of bytes to use as a cache of uncompressed data");
434
435 DEFINE_int32(cache_numshardbits, 6,
436 "Number of shards for the block cache"
437 " is 2 ** cache_numshardbits. Negative means use default settings."
438 " This is applied only if FLAGS_cache_size is non-negative.");
439
440 DEFINE_double(cache_high_pri_pool_ratio, 0.0,
441 "Ratio of block cache reserve for high pri blocks. "
442 "If > 0.0, we also enable "
443 "cache_index_and_filter_blocks_with_high_priority.");
444
445 DEFINE_bool(use_clock_cache, false,
446 "Replace default LRU block cache with clock cache.");
447
448 DEFINE_int64(simcache_size, -1,
449 "Number of bytes to use as a simcache of "
450 "uncompressed data. Nagative value disables simcache.");
451
452 DEFINE_bool(cache_index_and_filter_blocks, false,
453 "Cache index/filter blocks in block cache.");
454
455 DEFINE_bool(partition_index_and_filters, false,
456 "Partition index and filter blocks.");
457
458 DEFINE_bool(partition_index, false, "Partition index blocks");
459
460 DEFINE_int64(metadata_block_size,
461 ROCKSDB_NAMESPACE::BlockBasedTableOptions().metadata_block_size,
462 "Max partition size when partitioning index/filters");
463
464 // The default reduces the overhead of reading time with flash. With HDD, which
465 // offers much less throughput, however, this number better to be set to 1.
466 DEFINE_int32(ops_between_duration_checks, 1000,
467 "Check duration limit every x ops");
468
469 DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
470 "Pin index/filter blocks of L0 files in block cache.");
471
472 DEFINE_bool(
473 pin_top_level_index_and_filter, false,
474 "Pin top-level index of partitioned index/filter blocks in block cache.");
475
476 DEFINE_int32(block_size,
477 static_cast<int32_t>(
478 ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_size),
479 "Number of bytes in a block.");
480
481 DEFINE_int32(format_version,
482 static_cast<int32_t>(
483 ROCKSDB_NAMESPACE::BlockBasedTableOptions().format_version),
484 "Format version of SST files.");
485
486 DEFINE_int32(block_restart_interval,
487 ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_restart_interval,
488 "Number of keys between restart points "
489 "for delta encoding of keys in data block.");
490
491 DEFINE_int32(
492 index_block_restart_interval,
493 ROCKSDB_NAMESPACE::BlockBasedTableOptions().index_block_restart_interval,
494 "Number of keys between restart points "
495 "for delta encoding of keys in index block.");
496
497 DEFINE_int32(read_amp_bytes_per_bit,
498 ROCKSDB_NAMESPACE::BlockBasedTableOptions().read_amp_bytes_per_bit,
499 "Number of bytes per bit to be used in block read-amp bitmap");
500
501 DEFINE_bool(
502 enable_index_compression,
503 ROCKSDB_NAMESPACE::BlockBasedTableOptions().enable_index_compression,
504 "Compress the index block");
505
506 DEFINE_bool(block_align,
507 ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_align,
508 "Align data blocks on page size");
509
510 DEFINE_bool(use_data_block_hash_index, false,
511 "if use kDataBlockBinaryAndHash "
512 "instead of kDataBlockBinarySearch. "
513 "This is valid if only we use BlockTable");
514
515 DEFINE_double(data_block_hash_table_util_ratio, 0.75,
516 "util ratio for data block hash index table. "
517 "This is only valid if use_data_block_hash_index is "
518 "set to true");
519
520 DEFINE_int64(compressed_cache_size, -1,
521 "Number of bytes to use as a cache of compressed data.");
522
523 DEFINE_int64(row_cache_size, 0,
524 "Number of bytes to use as a cache of individual rows"
525 " (0 = disabled).");
526
527 DEFINE_int32(open_files, ROCKSDB_NAMESPACE::Options().max_open_files,
528 "Maximum number of files to keep open at the same time"
529 " (use default if == 0)");
530
531 DEFINE_int32(file_opening_threads,
532 ROCKSDB_NAMESPACE::Options().max_file_opening_threads,
533 "If open_files is set to -1, this option set the number of "
534 "threads that will be used to open files during DB::Open()");
535
536 DEFINE_bool(new_table_reader_for_compaction_inputs, true,
537 "If true, uses a separate file handle for compaction inputs");
538
539 DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
540
541 DEFINE_int32(log_readahead_size, 0, "WAL and manifest readahead size");
542
543 DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
544 "Maximum windows randomaccess buffer size");
545
546 DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
547 "Maximum write buffer for Writable File");
548
549 DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
550 " use default settings.");
551 DEFINE_double(memtable_bloom_size_ratio, 0,
552 "Ratio of memtable size used for bloom filter. 0 means no bloom "
553 "filter.");
554 DEFINE_bool(memtable_whole_key_filtering, false,
555 "Try to use whole key bloom filter in memtables.");
556 DEFINE_bool(memtable_use_huge_page, false,
557 "Try to use huge page in memtables.");
558
559 DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
560 " database. If you set this flag and also specify a benchmark that"
561 " wants a fresh database, that benchmark will fail.");
562
563 DEFINE_bool(use_existing_keys, false,
564 "If true, uses existing keys in the DB, "
565 "rather than generating new ones. This involves some startup "
566 "latency to load all keys into memory. It is supported for the "
567 "same read/overwrite benchmarks as `-use_existing_db=true`, which "
568 "must also be set for this flag to be enabled. When this flag is "
569 "set, the value for `-num` will be ignored.");
570
571 DEFINE_bool(show_table_properties, false,
572 "If true, then per-level table"
573 " properties will be printed on every stats-interval when"
574 " stats_interval is set and stats_per_interval is on.");
575
576 DEFINE_string(db, "", "Use the db with the following name.");
577
578 // Read cache flags
579
580 DEFINE_string(read_cache_path, "",
581 "If not empty string, a read cache will be used in this path");
582
583 DEFINE_int64(read_cache_size, 4LL * 1024 * 1024 * 1024,
584 "Maximum size of the read cache");
585
586 DEFINE_bool(read_cache_direct_write, true,
587 "Whether to use Direct IO for writing to the read cache");
588
589 DEFINE_bool(read_cache_direct_read, true,
590 "Whether to use Direct IO for reading from read cache");
591
592 DEFINE_bool(use_keep_filter, false, "Whether to use a noop compaction filter");
593
ValidateCacheNumshardbits(const char * flagname,int32_t value)594 static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
595 if (value >= 20) {
596 fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
597 flagname, value);
598 return false;
599 }
600 return true;
601 }
602
603 DEFINE_bool(verify_checksum, true,
604 "Verify checksum for every block read"
605 " from storage");
606
607 DEFINE_bool(statistics, false, "Database statistics");
608 DEFINE_int32(stats_level, ROCKSDB_NAMESPACE::StatsLevel::kExceptDetailedTimers,
609 "stats level for statistics");
610 DEFINE_string(statistics_string, "", "Serialized statistics string");
611 static class std::shared_ptr<ROCKSDB_NAMESPACE::Statistics> dbstats;
612
613 DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
614 " --num reads.");
615
616 DEFINE_bool(finish_after_writes, false, "Write thread terminates after all writes are finished");
617
618 DEFINE_bool(sync, false, "Sync all writes to disk");
619
620 DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
621
622 DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
623
624 DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
625
626 DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench",
627 "Truth key/values used when using verify");
628
629 DEFINE_int32(num_levels, 7, "The total number of levels");
630
631 DEFINE_int64(target_file_size_base,
632 ROCKSDB_NAMESPACE::Options().target_file_size_base,
633 "Target file size at level-1");
634
635 DEFINE_int32(target_file_size_multiplier,
636 ROCKSDB_NAMESPACE::Options().target_file_size_multiplier,
637 "A multiplier to compute target level-N file size (N >= 2)");
638
639 DEFINE_uint64(max_bytes_for_level_base,
640 ROCKSDB_NAMESPACE::Options().max_bytes_for_level_base,
641 "Max bytes for level-1");
642
643 DEFINE_bool(level_compaction_dynamic_level_bytes, false,
644 "Whether level size base is dynamic");
645
646 DEFINE_double(max_bytes_for_level_multiplier, 10,
647 "A multiplier to compute max bytes for level-N (N >= 2)");
648
649 static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
650 DEFINE_string(max_bytes_for_level_multiplier_additional, "",
651 "A vector that specifies additional fanout per level");
652
653 DEFINE_int32(level0_stop_writes_trigger,
654 ROCKSDB_NAMESPACE::Options().level0_stop_writes_trigger,
655 "Number of files in level-0"
656 " that will trigger put stop.");
657
658 DEFINE_int32(level0_slowdown_writes_trigger,
659 ROCKSDB_NAMESPACE::Options().level0_slowdown_writes_trigger,
660 "Number of files in level-0"
661 " that will slow down writes.");
662
663 DEFINE_int32(level0_file_num_compaction_trigger,
664 ROCKSDB_NAMESPACE::Options().level0_file_num_compaction_trigger,
665 "Number of files in level-0"
666 " when compactions start");
667
ValidateInt32Percent(const char * flagname,int32_t value)668 static bool ValidateInt32Percent(const char* flagname, int32_t value) {
669 if (value <= 0 || value>=100) {
670 fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
671 flagname, value);
672 return false;
673 }
674 return true;
675 }
676 DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
677 " as percentage) for the ReadRandomWriteRandom workload. The "
678 "default value 90 means 90% operations out of all reads and writes"
679 " operations are reads. In other words, 9 gets for every 1 put.");
680
681 DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
682 " as percentage) for the ReadRandomMergeRandom workload. The"
683 " default value 70 means 70% out of all read and merge operations"
684 " are merges. In other words, 7 merges for every 3 gets.");
685
686 DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
687 "deletes (used in RandomWithVerify only). RandomWithVerify "
688 "calculates writepercent as (100 - FLAGS_readwritepercent - "
689 "deletepercent), so deletepercent must be smaller than (100 - "
690 "FLAGS_readwritepercent)");
691
692 DEFINE_bool(optimize_filters_for_hits, false,
693 "Optimizes bloom filters for workloads for most lookups return "
694 "a value. For now this doesn't create bloom filters for the max "
695 "level of the LSM to reduce metadata that should fit in RAM. ");
696
697 DEFINE_uint64(delete_obsolete_files_period_micros, 0,
698 "Ignored. Left here for backward compatibility");
699
700 DEFINE_int64(writes_before_delete_range, 0,
701 "Number of writes before DeleteRange is called regularly.");
702
703 DEFINE_int64(writes_per_range_tombstone, 0,
704 "Number of writes between range tombstones");
705
706 DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");
707
708 DEFINE_int64(max_num_range_tombstones, 0,
709 "Maximum number of range tombstones "
710 "to insert.");
711
712 DEFINE_bool(expand_range_tombstones, false,
713 "Expand range tombstone into sequential regular tombstones.");
714
715 #ifndef ROCKSDB_LITE
716 // Transactions Options
717 DEFINE_bool(optimistic_transaction_db, false,
718 "Open a OptimisticTransactionDB instance. "
719 "Required for randomtransaction benchmark.");
720
721 DEFINE_bool(transaction_db, false,
722 "Open a TransactionDB instance. "
723 "Required for randomtransaction benchmark.");
724
725 DEFINE_uint64(transaction_sets, 2,
726 "Number of keys each transaction will "
727 "modify (use in RandomTransaction only). Max: 9999");
728
729 DEFINE_bool(transaction_set_snapshot, false,
730 "Setting to true will have each transaction call SetSnapshot()"
731 " upon creation.");
732
733 DEFINE_int32(transaction_sleep, 0,
734 "Max microseconds to sleep in between "
735 "reading and writing a value (used in RandomTransaction only). ");
736
737 DEFINE_uint64(transaction_lock_timeout, 100,
738 "If using a transaction_db, specifies the lock wait timeout in"
739 " milliseconds before failing a transaction waiting on a lock");
740 DEFINE_string(
741 options_file, "",
742 "The path to a RocksDB options file. If specified, then db_bench will "
743 "run with the RocksDB options in the default column family of the "
744 "specified options file. "
745 "Note that with this setting, db_bench will ONLY accept the following "
746 "RocksDB options related command-line arguments, all other arguments "
747 "that are related to RocksDB options will be ignored:\n"
748 "\t--use_existing_db\n"
749 "\t--use_existing_keys\n"
750 "\t--statistics\n"
751 "\t--row_cache_size\n"
752 "\t--row_cache_numshardbits\n"
753 "\t--enable_io_prio\n"
754 "\t--dump_malloc_stats\n"
755 "\t--num_multi_db\n");
756
757 // FIFO Compaction Options
758 DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0,
759 "The limit of total table file sizes to trigger FIFO compaction");
760
761 DEFINE_bool(fifo_compaction_allow_compaction, true,
762 "Allow compaction in FIFO compaction.");
763
764 DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds.");
765
766 // Blob DB Options
767 DEFINE_bool(use_blob_db, false,
768 "Open a BlobDB instance. "
769 "Required for large value benchmark.");
770
771 DEFINE_bool(
772 blob_db_enable_gc,
773 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection,
774 "Enable BlobDB garbage collection.");
775
776 DEFINE_double(
777 blob_db_gc_cutoff,
778 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().garbage_collection_cutoff,
779 "Cutoff ratio for BlobDB garbage collection.");
780
781 DEFINE_bool(blob_db_is_fifo,
782 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().is_fifo,
783 "Enable FIFO eviction strategy in BlobDB.");
784
785 DEFINE_uint64(blob_db_max_db_size,
786 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().max_db_size,
787 "Max size limit of the directory where blob files are stored.");
788
789 DEFINE_uint64(
790 blob_db_max_ttl_range, 0,
791 "TTL range to generate BlobDB data (in seconds). 0 means no TTL.");
792
793 DEFINE_uint64(blob_db_ttl_range_secs,
794 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().ttl_range_secs,
795 "TTL bucket size to use when creating blob files.");
796
797 DEFINE_uint64(blob_db_min_blob_size,
798 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size,
799 "Smallest blob to store in a file. Blobs smaller than this "
800 "will be inlined with the key in the LSM tree.");
801
802 DEFINE_uint64(blob_db_bytes_per_sync,
803 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync,
804 "Bytes to sync blob file at.");
805
806 DEFINE_uint64(blob_db_file_size,
807 ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().blob_file_size,
808 "Target size of each blob file.");
809
810 DEFINE_string(blob_db_compression_type, "snappy",
811 "Algorithm to use to compress blob in blob file");
812 static enum ROCKSDB_NAMESPACE::CompressionType
813 FLAGS_blob_db_compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression;
814
815 // Secondary DB instance Options
816 DEFINE_bool(use_secondary_db, false,
817 "Open a RocksDB secondary instance. A primary instance can be "
818 "running in another db_bench process.");
819
820 DEFINE_string(secondary_path, "",
821 "Path to a directory used by the secondary instance to store "
822 "private files, e.g. info log.");
823
824 DEFINE_int32(secondary_update_interval, 5,
825 "Secondary instance attempts to catch up with the primary every "
826 "secondary_update_interval seconds.");
827
828 #endif // ROCKSDB_LITE
829
830 DEFINE_bool(report_bg_io_stats, false,
831 "Measure times spents on I/Os while in compactions. ");
832
833 DEFINE_bool(use_stderr_info_logger, false,
834 "Write info logs to stderr instead of to LOG file. ");
835
836 DEFINE_string(trace_file, "", "Trace workload to a file. ");
837
838 DEFINE_int32(trace_replay_fast_forward, 1,
839 "Fast forward trace replay, must >= 1. ");
840 DEFINE_int32(block_cache_trace_sampling_frequency, 1,
841 "Block cache trace sampling frequency, termed s. It uses spatial "
842 "downsampling and samples accesses to one out of s blocks.");
843 DEFINE_int64(
844 block_cache_trace_max_trace_file_size_in_bytes,
845 uint64_t{64} * 1024 * 1024 * 1024,
846 "The maximum block cache trace file size in bytes. Block cache accesses "
847 "will not be logged if the trace file size exceeds this threshold. Default "
848 "is 64 GB.");
849 DEFINE_string(block_cache_trace_file, "", "Block cache trace file path.");
850 DEFINE_int32(trace_replay_threads, 1,
851 "The number of threads to replay, must >=1.");
852
StringToCompressionType(const char * ctype)853 static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
854 const char* ctype) {
855 assert(ctype);
856
857 if (!strcasecmp(ctype, "none"))
858 return ROCKSDB_NAMESPACE::kNoCompression;
859 else if (!strcasecmp(ctype, "snappy"))
860 return ROCKSDB_NAMESPACE::kSnappyCompression;
861 else if (!strcasecmp(ctype, "zlib"))
862 return ROCKSDB_NAMESPACE::kZlibCompression;
863 else if (!strcasecmp(ctype, "bzip2"))
864 return ROCKSDB_NAMESPACE::kBZip2Compression;
865 else if (!strcasecmp(ctype, "lz4"))
866 return ROCKSDB_NAMESPACE::kLZ4Compression;
867 else if (!strcasecmp(ctype, "lz4hc"))
868 return ROCKSDB_NAMESPACE::kLZ4HCCompression;
869 else if (!strcasecmp(ctype, "xpress"))
870 return ROCKSDB_NAMESPACE::kXpressCompression;
871 else if (!strcasecmp(ctype, "zstd"))
872 return ROCKSDB_NAMESPACE::kZSTD;
873
874 fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
875 return ROCKSDB_NAMESPACE::kSnappyCompression; // default value
876 }
877
ColumnFamilyName(size_t i)878 static std::string ColumnFamilyName(size_t i) {
879 if (i == 0) {
880 return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
881 } else {
882 char name[100];
883 snprintf(name, sizeof(name), "column_family_name_%06zu", i);
884 return std::string(name);
885 }
886 }
887
888 DEFINE_string(compression_type, "snappy",
889 "Algorithm to use to compress the database");
890 static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_compression_type_e =
891 ROCKSDB_NAMESPACE::kSnappyCompression;
892
893 DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
894
895 DEFINE_int32(compression_level, ROCKSDB_NAMESPACE::CompressionOptions().level,
896 "Compression level. The meaning of this value is library-"
897 "dependent. If unset, we try to use the default for the library "
898 "specified in `--compression_type`");
899
900 DEFINE_int32(compression_max_dict_bytes,
901 ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes,
902 "Maximum size of dictionary used to prime the compression "
903 "library.");
904
905 DEFINE_int32(compression_zstd_max_train_bytes,
906 ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes,
907 "Maximum size of training data passed to zstd's dictionary "
908 "trainer.");
909
910 DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
911 " from this level. Levels with number < min_level_to_compress are"
912 " not compressed. Otherwise, apply compression_type to "
913 "all levels.");
914
ValidateTableCacheNumshardbits(const char * flagname,int32_t value)915 static bool ValidateTableCacheNumshardbits(const char* flagname,
916 int32_t value) {
917 if (0 >= value || value > 20) {
918 fprintf(stderr, "Invalid value for --%s: %d, must be 0 < val <= 20\n",
919 flagname, value);
920 return false;
921 }
922 return true;
923 }
924 DEFINE_int32(table_cache_numshardbits, 4, "");
925
926 #ifndef ROCKSDB_LITE
927 DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
928 " with --hdfs.");
929 #endif // ROCKSDB_LITE
930 DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
931 " --env_uri.");
932
933 static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
934
935 static ROCKSDB_NAMESPACE::Env* FLAGS_env = ROCKSDB_NAMESPACE::Env::Default();
936
937 DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
938 "this is greater than zero. When 0 the interval grows over time.");
939
940 DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
941 "overrides stats_interval when both are > 0.");
942
943 DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
944 " this is greater than 0.");
945
946 DEFINE_int64(report_interval_seconds, 0,
947 "If greater than zero, it will write simple stats in CVS format "
948 "to --report_file every N seconds");
949
950 DEFINE_string(report_file, "report.csv",
951 "Filename where some simple stats are reported to (if "
952 "--report_interval_seconds is bigger than 0)");
953
954 DEFINE_int32(thread_status_per_interval, 0,
955 "Takes and report a snapshot of the current status of each thread"
956 " when this is greater than 0.");
957
958 DEFINE_int32(perf_level, ROCKSDB_NAMESPACE::PerfLevel::kDisable,
959 "Level of perf collection");
960
ValidateRateLimit(const char * flagname,double value)961 static bool ValidateRateLimit(const char* flagname, double value) {
962 const double EPSILON = 1e-10;
963 if ( value < -EPSILON ) {
964 fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
965 flagname, value);
966 return false;
967 }
968 return true;
969 }
970 DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED");
971
972 DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");
973
974 DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024,
975 "Slowdown writes if pending compaction bytes exceed this number");
976
977 DEFINE_uint64(hard_pending_compaction_bytes_limit, 128ull * 1024 * 1024 * 1024,
978 "Stop writes if pending compaction bytes exceed this number");
979
980 DEFINE_uint64(delayed_write_rate, 8388608u,
981 "Limited bytes allowed to DB when soft_rate_limit or "
982 "level0_slowdown_writes_trigger triggers");
983
984 DEFINE_bool(enable_pipelined_write, true,
985 "Allow WAL and memtable writes to be pipelined");
986
987 DEFINE_bool(unordered_write, false,
988 "Allow WAL and memtable writes to be pipelined");
989
990 DEFINE_bool(allow_concurrent_memtable_write, true,
991 "Allow multi-writers to update mem tables in parallel.");
992
993 DEFINE_bool(inplace_update_support,
994 ROCKSDB_NAMESPACE::Options().inplace_update_support,
995 "Support in-place memtable update for smaller or same-size values");
996
997 DEFINE_uint64(inplace_update_num_locks,
998 ROCKSDB_NAMESPACE::Options().inplace_update_num_locks,
999 "Number of RW locks to protect in-place memtable updates");
1000
1001 DEFINE_bool(enable_write_thread_adaptive_yield, true,
1002 "Use a yielding spin loop for brief writer thread waits.");
1003
1004 DEFINE_uint64(
1005 write_thread_max_yield_usec, 100,
1006 "Maximum microseconds for enable_write_thread_adaptive_yield operation.");
1007
1008 DEFINE_uint64(write_thread_slow_yield_usec, 3,
1009 "The threshold at which a slow yield is considered a signal that "
1010 "other processes or threads want the core.");
1011
1012 DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
1013 "When hard_rate_limit is set then this is the max time a put will"
1014 " be stalled.");
1015
1016 DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
1017
1018 DEFINE_bool(rate_limiter_auto_tuned, false,
1019 "Enable dynamic adjustment of rate limit according to demand for "
1020 "background I/O");
1021
1022
1023 DEFINE_bool(sine_write_rate, false,
1024 "Use a sine wave write_rate_limit");
1025
1026 DEFINE_uint64(sine_write_rate_interval_milliseconds, 10000,
1027 "Interval of which the sine wave write_rate_limit is recalculated");
1028
1029 DEFINE_double(sine_a, 1,
1030 "A in f(x) = A sin(bx + c) + d");
1031
1032 DEFINE_double(sine_b, 1,
1033 "B in f(x) = A sin(bx + c) + d");
1034
1035 DEFINE_double(sine_c, 0,
1036 "C in f(x) = A sin(bx + c) + d");
1037
1038 DEFINE_double(sine_d, 1,
1039 "D in f(x) = A sin(bx + c) + d");
1040
1041 DEFINE_bool(rate_limit_bg_reads, false,
1042 "Use options.rate_limiter on compaction reads");
1043
1044 DEFINE_uint64(
1045 benchmark_write_rate_limit, 0,
1046 "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
1047 "is the global rate in bytes/second.");
1048
1049 // the parameters of mix_graph
1050 DEFINE_double(keyrange_dist_a, 0.0,
1051 "The parameter 'a' of prefix average access distribution "
1052 "f(x)=a*exp(b*x)+c*exp(d*x)");
1053 DEFINE_double(keyrange_dist_b, 0.0,
1054 "The parameter 'b' of prefix average access distribution "
1055 "f(x)=a*exp(b*x)+c*exp(d*x)");
1056 DEFINE_double(keyrange_dist_c, 0.0,
1057 "The parameter 'c' of prefix average access distribution"
1058 "f(x)=a*exp(b*x)+c*exp(d*x)");
1059 DEFINE_double(keyrange_dist_d, 0.0,
1060 "The parameter 'd' of prefix average access distribution"
1061 "f(x)=a*exp(b*x)+c*exp(d*x)");
1062 DEFINE_int64(keyrange_num, 1,
1063 "The number of key ranges that are in the same prefix "
1064 "group, each prefix range will have its key acccess "
1065 "distribution");
1066 DEFINE_double(key_dist_a, 0.0,
1067 "The parameter 'a' of key access distribution model "
1068 "f(x)=a*x^b");
1069 DEFINE_double(key_dist_b, 0.0,
1070 "The parameter 'b' of key access distribution model "
1071 "f(x)=a*x^b");
1072 DEFINE_double(value_theta, 0.0,
1073 "The parameter 'theta' of Generized Pareto Distribution "
1074 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
1075 DEFINE_double(value_k, 0.0,
1076 "The parameter 'k' of Generized Pareto Distribution "
1077 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
1078 DEFINE_double(value_sigma, 0.0,
1079 "The parameter 'theta' of Generized Pareto Distribution "
1080 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
1081 DEFINE_double(iter_theta, 0.0,
1082 "The parameter 'theta' of Generized Pareto Distribution "
1083 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
1084 DEFINE_double(iter_k, 0.0,
1085 "The parameter 'k' of Generized Pareto Distribution "
1086 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
1087 DEFINE_double(iter_sigma, 0.0,
1088 "The parameter 'sigma' of Generized Pareto Distribution "
1089 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
1090 DEFINE_double(mix_get_ratio, 1.0,
1091 "The ratio of Get queries of mix_graph workload");
1092 DEFINE_double(mix_put_ratio, 0.0,
1093 "The ratio of Put queries of mix_graph workload");
1094 DEFINE_double(mix_seek_ratio, 0.0,
1095 "The ratio of Seek queries of mix_graph workload");
1096 DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
1097 DEFINE_int64(mix_ave_kv_size, 512,
1098 "The average key-value size of this workload");
1099 DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
1100 DEFINE_double(
1101 sine_mix_rate_noise, 0.0,
1102 "Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
1103 DEFINE_bool(sine_mix_rate, false,
1104 "Enable the sine QPS control on the mix workload");
1105 DEFINE_uint64(
1106 sine_mix_rate_interval_milliseconds, 10000,
1107 "Interval of which the sine wave read_rate_limit is recalculated");
1108 DEFINE_int64(mix_accesses, -1,
1109 "The total query accesses of mix_graph workload");
1110
1111 DEFINE_uint64(
1112 benchmark_read_rate_limit, 0,
1113 "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
1114 "is the global rate in ops/second.");
1115
1116 DEFINE_uint64(max_compaction_bytes,
1117 ROCKSDB_NAMESPACE::Options().max_compaction_bytes,
1118 "Max bytes allowed in one compaction");
1119
1120 #ifndef ROCKSDB_LITE
1121 DEFINE_bool(readonly, false, "Run read only benchmarks.");
1122
1123 DEFINE_bool(print_malloc_stats, false,
1124 "Print malloc stats to stdout after benchmarks finish.");
1125 #endif // ROCKSDB_LITE
1126
1127 DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
1128
1129 DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
1130 DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
1131 " in MB.");
1132 DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
1133
1134 DEFINE_bool(mmap_read, ROCKSDB_NAMESPACE::Options().allow_mmap_reads,
1135 "Allow reads to occur via mmap-ing files");
1136
1137 DEFINE_bool(mmap_write, ROCKSDB_NAMESPACE::Options().allow_mmap_writes,
1138 "Allow writes to occur via mmap-ing files");
1139
1140 DEFINE_bool(use_direct_reads, ROCKSDB_NAMESPACE::Options().use_direct_reads,
1141 "Use O_DIRECT for reading data");
1142
1143 DEFINE_bool(use_direct_io_for_flush_and_compaction,
1144 ROCKSDB_NAMESPACE::Options().use_direct_io_for_flush_and_compaction,
1145 "Use O_DIRECT for background flush and compaction writes");
1146
1147 DEFINE_bool(advise_random_on_open,
1148 ROCKSDB_NAMESPACE::Options().advise_random_on_open,
1149 "Advise random access on table file open");
1150
1151 DEFINE_string(compaction_fadvice, "NORMAL",
1152 "Access pattern advice when a file is compacted");
1153 static auto FLAGS_compaction_fadvice_e =
1154 ROCKSDB_NAMESPACE::Options().access_hint_on_compaction_start;
1155
1156 DEFINE_bool(use_tailing_iterator, false,
1157 "Use tailing iterator to access a series of keys instead of get");
1158
1159 DEFINE_bool(use_adaptive_mutex, ROCKSDB_NAMESPACE::Options().use_adaptive_mutex,
1160 "Use adaptive mutex");
1161
1162 DEFINE_uint64(bytes_per_sync, ROCKSDB_NAMESPACE::Options().bytes_per_sync,
1163 "Allows OS to incrementally sync SST files to disk while they are"
1164 " being written, in the background. Issue one request for every"
1165 " bytes_per_sync written. 0 turns it off.");
1166
1167 DEFINE_uint64(wal_bytes_per_sync,
1168 ROCKSDB_NAMESPACE::Options().wal_bytes_per_sync,
1169 "Allows OS to incrementally sync WAL files to disk while they are"
1170 " being written, in the background. Issue one request for every"
1171 " wal_bytes_per_sync written. 0 turns it off.");
1172
1173 DEFINE_bool(use_single_deletes, true,
1174 "Use single deletes (used in RandomReplaceKeys only).");
1175
1176 DEFINE_double(stddev, 2000.0,
1177 "Standard deviation of normal distribution used for picking keys"
1178 " (used in RandomReplaceKeys only).");
1179
1180 DEFINE_int32(key_id_range, 100000,
1181 "Range of possible value of key id (used in TimeSeries only).");
1182
1183 DEFINE_string(expire_style, "none",
1184 "Style to remove expired time entries. Can be one of the options "
1185 "below: none (do not expired data), compaction_filter (use a "
1186 "compaction filter to remove expired data), delete (seek IDs and "
1187 "remove expired data) (used in TimeSeries only).");
1188
1189 DEFINE_uint64(
1190 time_range, 100000,
1191 "Range of timestamp that store in the database (used in TimeSeries"
1192 " only).");
1193
1194 DEFINE_int32(num_deletion_threads, 1,
1195 "Number of threads to do deletion (used in TimeSeries and delete "
1196 "expire_style only).");
1197
1198 DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
1199 " operations on a key in the memtable");
1200
ValidatePrefixSize(const char * flagname,int32_t value)1201 static bool ValidatePrefixSize(const char* flagname, int32_t value) {
1202 if (value < 0 || value>=2000000000) {
1203 fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
1204 flagname, value);
1205 return false;
1206 }
1207 return true;
1208 }
1209
1210 DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
1211 "plain table");
1212 DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
1213 "per prefix, 0 means no special handling of the prefix, "
1214 "i.e. use the prefix comes with the generated random number.");
1215 DEFINE_bool(total_order_seek, false,
1216 "Enable total order seek regardless of index format.");
1217 DEFINE_bool(prefix_same_as_start, false,
1218 "Enforce iterator to return keys with prefix same as seek key.");
1219 DEFINE_bool(
1220 seek_missing_prefix, false,
1221 "Iterator seek to keys with non-exist prefixes. Require prefix_size > 8");
1222
1223 DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
1224 "If non-zero, enable "
1225 "memtable insert with hint with the given prefix size.");
1226 DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
1227 "threads' IO priority");
1228 DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction "
1229 "threads' CPU priority");
1230 DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
1231 "table becomes an identity function. This is only valid when key "
1232 "is 8 bytes");
1233 DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
1234 DEFINE_uint64(stats_dump_period_sec,
1235 ROCKSDB_NAMESPACE::Options().stats_dump_period_sec,
1236 "Gap between printing stats to log in seconds");
1237 DEFINE_uint64(stats_persist_period_sec,
1238 ROCKSDB_NAMESPACE::Options().stats_persist_period_sec,
1239 "Gap between persisting stats in seconds");
1240 DEFINE_bool(persist_stats_to_disk,
1241 ROCKSDB_NAMESPACE::Options().persist_stats_to_disk,
1242 "whether to persist stats to disk");
1243 DEFINE_uint64(stats_history_buffer_size,
1244 ROCKSDB_NAMESPACE::Options().stats_history_buffer_size,
1245 "Max number of stats snapshots to keep in memory");
1246 DEFINE_int64(multiread_stride, 0,
1247 "Stride length for the keys in a MultiGet batch");
1248 DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
1249
1250 enum RepFactory {
1251 kSkipList,
1252 kPrefixHash,
1253 kVectorRep,
1254 kHashLinkedList,
1255 };
1256
StringToRepFactory(const char * ctype)1257 static enum RepFactory StringToRepFactory(const char* ctype) {
1258 assert(ctype);
1259
1260 if (!strcasecmp(ctype, "skip_list"))
1261 return kSkipList;
1262 else if (!strcasecmp(ctype, "prefix_hash"))
1263 return kPrefixHash;
1264 else if (!strcasecmp(ctype, "vector"))
1265 return kVectorRep;
1266 else if (!strcasecmp(ctype, "hash_linkedlist"))
1267 return kHashLinkedList;
1268
1269 fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
1270 return kSkipList;
1271 }
1272
1273 static enum RepFactory FLAGS_rep_factory;
1274 DEFINE_string(memtablerep, "skip_list", "");
1275 DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
1276 DEFINE_bool(use_plain_table, false, "if use plain table "
1277 "instead of block-based table format");
1278 DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
1279 DEFINE_double(cuckoo_hash_ratio, 0.9, "Hash ratio for Cuckoo SST table.");
1280 DEFINE_bool(use_hash_search, false, "if use kHashSearch "
1281 "instead of kBinarySearch. "
1282 "This is valid if only we use BlockTable");
1283 DEFINE_bool(use_block_based_filter, false, "if use kBlockBasedFilter "
1284 "instead of kFullFilter for filter block. "
1285 "This is valid if only we use BlockTable");
1286 DEFINE_string(merge_operator, "", "The merge operator to use with the database."
1287 "If a new merge operator is specified, be sure to use fresh"
1288 " database The possible merge operators are defined in"
1289 " utilities/merge_operators.h");
1290 DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
1291 "linear search first for this many steps from the previous "
1292 "position");
1293 DEFINE_bool(report_file_operations, false, "if report number of file "
1294 "operations");
1295 DEFINE_int32(readahead_size, 0, "Iterator readahead size");
1296
1297 static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
1298 RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
1299
1300 static const bool FLAGS_hard_rate_limit_dummy __attribute__((__unused__)) =
1301 RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);
1302
1303 static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) =
1304 RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
1305
1306 static const bool FLAGS_key_size_dummy __attribute__((__unused__)) =
1307 RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);
1308
1309 static const bool FLAGS_cache_numshardbits_dummy __attribute__((__unused__)) =
1310 RegisterFlagValidator(&FLAGS_cache_numshardbits,
1311 &ValidateCacheNumshardbits);
1312
1313 static const bool FLAGS_readwritepercent_dummy __attribute__((__unused__)) =
1314 RegisterFlagValidator(&FLAGS_readwritepercent, &ValidateInt32Percent);
1315
1316 DEFINE_int32(disable_seek_compaction, false,
1317 "Not used, left here for backwards compatibility");
1318
1319 static const bool FLAGS_deletepercent_dummy __attribute__((__unused__)) =
1320 RegisterFlagValidator(&FLAGS_deletepercent, &ValidateInt32Percent);
1321 static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((__unused__)) =
1322 RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
1323 &ValidateTableCacheNumshardbits);
1324
1325 namespace ROCKSDB_NAMESPACE {
1326
1327 namespace {
1328 struct ReportFileOpCounters {
1329 std::atomic<int> open_counter_;
1330 std::atomic<int> read_counter_;
1331 std::atomic<int> append_counter_;
1332 std::atomic<uint64_t> bytes_read_;
1333 std::atomic<uint64_t> bytes_written_;
1334 };
1335
1336 // A special Env to records and report file operations in db_bench
1337 class ReportFileOpEnv : public EnvWrapper {
1338 public:
ReportFileOpEnv(Env * base)1339 explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }
1340
reset()1341 void reset() {
1342 counters_.open_counter_ = 0;
1343 counters_.read_counter_ = 0;
1344 counters_.append_counter_ = 0;
1345 counters_.bytes_read_ = 0;
1346 counters_.bytes_written_ = 0;
1347 }
1348
NewSequentialFile(const std::string & f,std::unique_ptr<SequentialFile> * r,const EnvOptions & soptions)1349 Status NewSequentialFile(const std::string& f,
1350 std::unique_ptr<SequentialFile>* r,
1351 const EnvOptions& soptions) override {
1352 class CountingFile : public SequentialFile {
1353 private:
1354 std::unique_ptr<SequentialFile> target_;
1355 ReportFileOpCounters* counters_;
1356
1357 public:
1358 CountingFile(std::unique_ptr<SequentialFile>&& target,
1359 ReportFileOpCounters* counters)
1360 : target_(std::move(target)), counters_(counters) {}
1361
1362 Status Read(size_t n, Slice* result, char* scratch) override {
1363 counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
1364 Status rv = target_->Read(n, result, scratch);
1365 counters_->bytes_read_.fetch_add(result->size(),
1366 std::memory_order_relaxed);
1367 return rv;
1368 }
1369
1370 Status Skip(uint64_t n) override { return target_->Skip(n); }
1371 };
1372
1373 Status s = target()->NewSequentialFile(f, r, soptions);
1374 if (s.ok()) {
1375 counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
1376 r->reset(new CountingFile(std::move(*r), counters()));
1377 }
1378 return s;
1379 }
1380
NewRandomAccessFile(const std::string & f,std::unique_ptr<RandomAccessFile> * r,const EnvOptions & soptions)1381 Status NewRandomAccessFile(const std::string& f,
1382 std::unique_ptr<RandomAccessFile>* r,
1383 const EnvOptions& soptions) override {
1384 class CountingFile : public RandomAccessFile {
1385 private:
1386 std::unique_ptr<RandomAccessFile> target_;
1387 ReportFileOpCounters* counters_;
1388
1389 public:
1390 CountingFile(std::unique_ptr<RandomAccessFile>&& target,
1391 ReportFileOpCounters* counters)
1392 : target_(std::move(target)), counters_(counters) {}
1393 Status Read(uint64_t offset, size_t n, Slice* result,
1394 char* scratch) const override {
1395 counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
1396 Status rv = target_->Read(offset, n, result, scratch);
1397 counters_->bytes_read_.fetch_add(result->size(),
1398 std::memory_order_relaxed);
1399 return rv;
1400 }
1401 };
1402
1403 Status s = target()->NewRandomAccessFile(f, r, soptions);
1404 if (s.ok()) {
1405 counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
1406 r->reset(new CountingFile(std::move(*r), counters()));
1407 }
1408 return s;
1409 }
1410
NewWritableFile(const std::string & f,std::unique_ptr<WritableFile> * r,const EnvOptions & soptions)1411 Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
1412 const EnvOptions& soptions) override {
1413 class CountingFile : public WritableFile {
1414 private:
1415 std::unique_ptr<WritableFile> target_;
1416 ReportFileOpCounters* counters_;
1417
1418 public:
1419 CountingFile(std::unique_ptr<WritableFile>&& target,
1420 ReportFileOpCounters* counters)
1421 : target_(std::move(target)), counters_(counters) {}
1422
1423 Status Append(const Slice& data) override {
1424 counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
1425 Status rv = target_->Append(data);
1426 counters_->bytes_written_.fetch_add(data.size(),
1427 std::memory_order_relaxed);
1428 return rv;
1429 }
1430
1431 Status Truncate(uint64_t size) override { return target_->Truncate(size); }
1432 Status Close() override { return target_->Close(); }
1433 Status Flush() override { return target_->Flush(); }
1434 Status Sync() override { return target_->Sync(); }
1435 };
1436
1437 Status s = target()->NewWritableFile(f, r, soptions);
1438 if (s.ok()) {
1439 counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
1440 r->reset(new CountingFile(std::move(*r), counters()));
1441 }
1442 return s;
1443 }
1444
1445 // getter
counters()1446 ReportFileOpCounters* counters() { return &counters_; }
1447
1448 private:
1449 ReportFileOpCounters counters_;
1450 };
1451
1452 } // namespace
1453
1454 enum DistributionType : unsigned char {
1455 kFixed = 0,
1456 kUniform,
1457 kNormal
1458 };
1459
1460 static enum DistributionType FLAGS_value_size_distribution_type_e = kFixed;
1461
StringToDistributionType(const char * ctype)1462 static enum DistributionType StringToDistributionType(const char* ctype) {
1463 assert(ctype);
1464
1465 if (!strcasecmp(ctype, "fixed"))
1466 return kFixed;
1467 else if (!strcasecmp(ctype, "uniform"))
1468 return kUniform;
1469 else if (!strcasecmp(ctype, "normal"))
1470 return kNormal;
1471
1472 fprintf(stdout, "Cannot parse distribution type '%s'\n", ctype);
1473 return kFixed; // default value
1474 }
1475
1476 class BaseDistribution {
1477 public:
BaseDistribution(unsigned int min,unsigned int max)1478 BaseDistribution(unsigned int min, unsigned int max) :
1479 min_value_size_(min),
1480 max_value_size_(max) {}
~BaseDistribution()1481 virtual ~BaseDistribution() {}
1482
Generate()1483 unsigned int Generate() {
1484 auto val = Get();
1485 if (NeedTruncate()) {
1486 val = std::max(min_value_size_, val);
1487 val = std::min(max_value_size_, val);
1488 }
1489 return val;
1490 }
1491 private:
1492 virtual unsigned int Get() = 0;
NeedTruncate()1493 virtual bool NeedTruncate() {
1494 return true;
1495 }
1496 unsigned int min_value_size_;
1497 unsigned int max_value_size_;
1498 };
1499
1500 class FixedDistribution : public BaseDistribution
1501 {
1502 public:
FixedDistribution(unsigned int size)1503 FixedDistribution(unsigned int size) :
1504 BaseDistribution(size, size),
1505 size_(size) {}
1506 private:
Get()1507 virtual unsigned int Get() override {
1508 return size_;
1509 }
NeedTruncate()1510 virtual bool NeedTruncate() override {
1511 return false;
1512 }
1513 unsigned int size_;
1514 };
1515
1516 class NormalDistribution
1517 : public BaseDistribution, public std::normal_distribution<double> {
1518 public:
NormalDistribution(unsigned int min,unsigned int max)1519 NormalDistribution(unsigned int min, unsigned int max) :
1520 BaseDistribution(min, max),
1521 // 99.7% values within the range [min, max].
1522 std::normal_distribution<double>((double)(min + max) / 2.0 /*mean*/,
1523 (double)(max - min) / 6.0 /*stddev*/),
1524 gen_(rd_()) {}
1525 private:
Get()1526 virtual unsigned int Get() override {
1527 return static_cast<unsigned int>((*this)(gen_));
1528 }
1529 std::random_device rd_;
1530 std::mt19937 gen_;
1531 };
1532
1533 class UniformDistribution
1534 : public BaseDistribution,
1535 public std::uniform_int_distribution<unsigned int> {
1536 public:
UniformDistribution(unsigned int min,unsigned int max)1537 UniformDistribution(unsigned int min, unsigned int max) :
1538 BaseDistribution(min, max),
1539 std::uniform_int_distribution<unsigned int>(min, max),
1540 gen_(rd_()) {}
1541 private:
Get()1542 virtual unsigned int Get() override {
1543 return (*this)(gen_);
1544 }
NeedTruncate()1545 virtual bool NeedTruncate() override {
1546 return false;
1547 }
1548 std::random_device rd_;
1549 std::mt19937 gen_;
1550 };
1551
1552 // Helper for quickly generating random data.
1553 class RandomGenerator {
1554 private:
1555 std::string data_;
1556 unsigned int pos_;
1557 std::unique_ptr<BaseDistribution> dist_;
1558
1559 public:
1560
RandomGenerator()1561 RandomGenerator() {
1562 auto max_value_size = FLAGS_value_size_max;
1563 switch (FLAGS_value_size_distribution_type_e) {
1564 case kUniform:
1565 dist_.reset(new UniformDistribution(FLAGS_value_size_min,
1566 FLAGS_value_size_max));
1567 break;
1568 case kNormal:
1569 dist_.reset(new NormalDistribution(FLAGS_value_size_min,
1570 FLAGS_value_size_max));
1571 break;
1572 case kFixed:
1573 default:
1574 dist_.reset(new FixedDistribution(value_size));
1575 max_value_size = value_size;
1576 }
1577 // We use a limited amount of data over and over again and ensure
1578 // that it is larger than the compression window (32KB), and also
1579 // large enough to serve all typical value sizes we want to write.
1580 Random rnd(301);
1581 std::string piece;
1582 while (data_.size() < (unsigned)std::max(1048576, max_value_size)) {
1583 // Add a short fragment that is as compressible as specified
1584 // by FLAGS_compression_ratio.
1585 test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
1586 data_.append(piece);
1587 }
1588 pos_ = 0;
1589 }
1590
Generate(unsigned int len)1591 Slice Generate(unsigned int len) {
1592 assert(len <= data_.size());
1593 if (pos_ + len > data_.size()) {
1594 pos_ = 0;
1595 }
1596 pos_ += len;
1597 return Slice(data_.data() + pos_ - len, len);
1598 }
1599
Generate()1600 Slice Generate() {
1601 auto len = dist_->Generate();
1602 return Generate(len);
1603 }
1604 };
1605
AppendWithSpace(std::string * str,Slice msg)1606 static void AppendWithSpace(std::string* str, Slice msg) {
1607 if (msg.empty()) return;
1608 if (!str->empty()) {
1609 str->push_back(' ');
1610 }
1611 str->append(msg.data(), msg.size());
1612 }
1613
1614 struct DBWithColumnFamilies {
1615 std::vector<ColumnFamilyHandle*> cfh;
1616 DB* db;
1617 #ifndef ROCKSDB_LITE
1618 OptimisticTransactionDB* opt_txn_db;
1619 #endif // ROCKSDB_LITE
1620 std::atomic<size_t> num_created; // Need to be updated after all the
1621 // new entries in cfh are set.
1622 size_t num_hot; // Number of column families to be queried at each moment.
1623 // After each CreateNewCf(), another num_hot number of new
1624 // Column families will be created and used to be queried.
1625 port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf()
1626 std::vector<int> cfh_idx_to_prob; // ith index holds probability of operating
1627 // on cfh[i].
1628
DBWithColumnFamiliesROCKSDB_NAMESPACE::DBWithColumnFamilies1629 DBWithColumnFamilies()
1630 : db(nullptr)
1631 #ifndef ROCKSDB_LITE
1632 , opt_txn_db(nullptr)
1633 #endif // ROCKSDB_LITE
1634 {
1635 cfh.clear();
1636 num_created = 0;
1637 num_hot = 0;
1638 }
1639
DBWithColumnFamiliesROCKSDB_NAMESPACE::DBWithColumnFamilies1640 DBWithColumnFamilies(const DBWithColumnFamilies& other)
1641 : cfh(other.cfh),
1642 db(other.db),
1643 #ifndef ROCKSDB_LITE
1644 opt_txn_db(other.opt_txn_db),
1645 #endif // ROCKSDB_LITE
1646 num_created(other.num_created.load()),
1647 num_hot(other.num_hot),
1648 cfh_idx_to_prob(other.cfh_idx_to_prob) {
1649 }
1650
DeleteDBsROCKSDB_NAMESPACE::DBWithColumnFamilies1651 void DeleteDBs() {
1652 std::for_each(cfh.begin(), cfh.end(),
1653 [](ColumnFamilyHandle* cfhi) { delete cfhi; });
1654 cfh.clear();
1655 #ifndef ROCKSDB_LITE
1656 if (opt_txn_db) {
1657 delete opt_txn_db;
1658 opt_txn_db = nullptr;
1659 } else {
1660 delete db;
1661 db = nullptr;
1662 }
1663 #else
1664 delete db;
1665 db = nullptr;
1666 #endif // ROCKSDB_LITE
1667 }
1668
GetCfhROCKSDB_NAMESPACE::DBWithColumnFamilies1669 ColumnFamilyHandle* GetCfh(int64_t rand_num) {
1670 assert(num_hot > 0);
1671 size_t rand_offset = 0;
1672 if (!cfh_idx_to_prob.empty()) {
1673 assert(cfh_idx_to_prob.size() == num_hot);
1674 int sum = 0;
1675 while (sum + cfh_idx_to_prob[rand_offset] < rand_num % 100) {
1676 sum += cfh_idx_to_prob[rand_offset];
1677 ++rand_offset;
1678 }
1679 assert(rand_offset < cfh_idx_to_prob.size());
1680 } else {
1681 rand_offset = rand_num % num_hot;
1682 }
1683 return cfh[num_created.load(std::memory_order_acquire) - num_hot +
1684 rand_offset];
1685 }
1686
1687 // stage: assume CF from 0 to stage * num_hot has be created. Need to create
1688 // stage * num_hot + 1 to stage * (num_hot + 1).
CreateNewCfROCKSDB_NAMESPACE::DBWithColumnFamilies1689 void CreateNewCf(ColumnFamilyOptions options, int64_t stage) {
1690 MutexLock l(&create_cf_mutex);
1691 if ((stage + 1) * num_hot <= num_created) {
1692 // Already created.
1693 return;
1694 }
1695 auto new_num_created = num_created + num_hot;
1696 assert(new_num_created <= cfh.size());
1697 for (size_t i = num_created; i < new_num_created; i++) {
1698 Status s =
1699 db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i]));
1700 if (!s.ok()) {
1701 fprintf(stderr, "create column family error: %s\n",
1702 s.ToString().c_str());
1703 abort();
1704 }
1705 }
1706 num_created.store(new_num_created, std::memory_order_release);
1707 }
1708 };
1709
1710 // a class that reports stats to CSV file
1711 class ReporterAgent {
1712 public:
ReporterAgent(Env * env,const std::string & fname,uint64_t report_interval_secs)1713 ReporterAgent(Env* env, const std::string& fname,
1714 uint64_t report_interval_secs)
1715 : env_(env),
1716 total_ops_done_(0),
1717 last_report_(0),
1718 report_interval_secs_(report_interval_secs),
1719 stop_(false) {
1720 auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
1721 if (s.ok()) {
1722 s = report_file_->Append(Header() + "\n");
1723 }
1724 if (s.ok()) {
1725 s = report_file_->Flush();
1726 }
1727 if (!s.ok()) {
1728 fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
1729 s.ToString().c_str());
1730 abort();
1731 }
1732
1733 reporting_thread_ = port::Thread([&]() { SleepAndReport(); });
1734 }
1735
~ReporterAgent()1736 ~ReporterAgent() {
1737 {
1738 std::unique_lock<std::mutex> lk(mutex_);
1739 stop_ = true;
1740 stop_cv_.notify_all();
1741 }
1742 reporting_thread_.join();
1743 }
1744
1745 // thread safe
ReportFinishedOps(int64_t num_ops)1746 void ReportFinishedOps(int64_t num_ops) {
1747 total_ops_done_.fetch_add(num_ops);
1748 }
1749
1750 private:
Header() const1751 std::string Header() const { return "secs_elapsed,interval_qps"; }
SleepAndReport()1752 void SleepAndReport() {
1753 auto time_started = env_->NowMicros();
1754 while (true) {
1755 {
1756 std::unique_lock<std::mutex> lk(mutex_);
1757 if (stop_ ||
1758 stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
1759 [&]() { return stop_; })) {
1760 // stopping
1761 break;
1762 }
1763 // else -> timeout, which means time for a report!
1764 }
1765 auto total_ops_done_snapshot = total_ops_done_.load();
1766 // round the seconds elapsed
1767 auto secs_elapsed =
1768 (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
1769 kMicrosInSecond;
1770 std::string report = ToString(secs_elapsed) + "," +
1771 ToString(total_ops_done_snapshot - last_report_) +
1772 "\n";
1773 auto s = report_file_->Append(report);
1774 if (s.ok()) {
1775 s = report_file_->Flush();
1776 }
1777 if (!s.ok()) {
1778 fprintf(stderr,
1779 "Can't write to report file (%s), stopping the reporting\n",
1780 s.ToString().c_str());
1781 break;
1782 }
1783 last_report_ = total_ops_done_snapshot;
1784 }
1785 }
1786
1787 Env* env_;
1788 std::unique_ptr<WritableFile> report_file_;
1789 std::atomic<int64_t> total_ops_done_;
1790 int64_t last_report_;
1791 const uint64_t report_interval_secs_;
1792 ROCKSDB_NAMESPACE::port::Thread reporting_thread_;
1793 std::mutex mutex_;
1794 // will notify on stop
1795 std::condition_variable stop_cv_;
1796 bool stop_;
1797 };
1798
1799 enum OperationType : unsigned char {
1800 kRead = 0,
1801 kWrite,
1802 kDelete,
1803 kSeek,
1804 kMerge,
1805 kUpdate,
1806 kCompress,
1807 kUncompress,
1808 kCrc,
1809 kHash,
1810 kOthers
1811 };
1812
1813 static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
1814 OperationTypeString = {
1815 {kRead, "read"},
1816 {kWrite, "write"},
1817 {kDelete, "delete"},
1818 {kSeek, "seek"},
1819 {kMerge, "merge"},
1820 {kUpdate, "update"},
1821 {kCompress, "compress"},
1822 {kCompress, "uncompress"},
1823 {kCrc, "crc"},
1824 {kHash, "hash"},
1825 {kOthers, "op"}
1826 };
1827
1828 class CombinedStats;
1829 class Stats {
1830 private:
1831 int id_;
1832 uint64_t start_;
1833 uint64_t sine_interval_;
1834 uint64_t finish_;
1835 double seconds_;
1836 uint64_t done_;
1837 uint64_t last_report_done_;
1838 uint64_t next_report_;
1839 uint64_t bytes_;
1840 uint64_t last_op_finish_;
1841 uint64_t last_report_finish_;
1842 std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
1843 std::hash<unsigned char>> hist_;
1844 std::string message_;
1845 bool exclude_from_merge_;
1846 ReporterAgent* reporter_agent_; // does not own
1847 friend class CombinedStats;
1848
1849 public:
Stats()1850 Stats() { Start(-1); }
1851
SetReporterAgent(ReporterAgent * reporter_agent)1852 void SetReporterAgent(ReporterAgent* reporter_agent) {
1853 reporter_agent_ = reporter_agent;
1854 }
1855
Start(int id)1856 void Start(int id) {
1857 id_ = id;
1858 next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
1859 last_op_finish_ = start_;
1860 hist_.clear();
1861 done_ = 0;
1862 last_report_done_ = 0;
1863 bytes_ = 0;
1864 seconds_ = 0;
1865 start_ = FLAGS_env->NowMicros();
1866 sine_interval_ = FLAGS_env->NowMicros();
1867 finish_ = start_;
1868 last_report_finish_ = start_;
1869 message_.clear();
1870 // When set, stats from this thread won't be merged with others.
1871 exclude_from_merge_ = false;
1872 }
1873
Merge(const Stats & other)1874 void Merge(const Stats& other) {
1875 if (other.exclude_from_merge_)
1876 return;
1877
1878 for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
1879 auto this_it = hist_.find(it->first);
1880 if (this_it != hist_.end()) {
1881 this_it->second->Merge(*(other.hist_.at(it->first)));
1882 } else {
1883 hist_.insert({ it->first, it->second });
1884 }
1885 }
1886
1887 done_ += other.done_;
1888 bytes_ += other.bytes_;
1889 seconds_ += other.seconds_;
1890 if (other.start_ < start_) start_ = other.start_;
1891 if (other.finish_ > finish_) finish_ = other.finish_;
1892
1893 // Just keep the messages from one thread
1894 if (message_.empty()) message_ = other.message_;
1895 }
1896
Stop()1897 void Stop() {
1898 finish_ = FLAGS_env->NowMicros();
1899 seconds_ = (finish_ - start_) * 1e-6;
1900 }
1901
AddMessage(Slice msg)1902 void AddMessage(Slice msg) {
1903 AppendWithSpace(&message_, msg);
1904 }
1905
SetId(int id)1906 void SetId(int id) { id_ = id; }
SetExcludeFromMerge()1907 void SetExcludeFromMerge() { exclude_from_merge_ = true; }
1908
PrintThreadStatus()1909 void PrintThreadStatus() {
1910 std::vector<ThreadStatus> thread_list;
1911 FLAGS_env->GetThreadList(&thread_list);
1912
1913 fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1914 "ThreadID", "ThreadType", "cfName", "Operation",
1915 "ElapsedTime", "Stage", "State", "OperationProperties");
1916
1917 int64_t current_time = 0;
1918 FLAGS_env->GetCurrentTime(¤t_time);
1919 for (auto ts : thread_list) {
1920 fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
1921 ts.thread_id,
1922 ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
1923 ts.cf_name.c_str(),
1924 ThreadStatus::GetOperationName(ts.operation_type).c_str(),
1925 ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(),
1926 ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
1927 ThreadStatus::GetStateName(ts.state_type).c_str());
1928
1929 auto op_properties = ThreadStatus::InterpretOperationProperties(
1930 ts.operation_type, ts.op_properties);
1931 for (const auto& op_prop : op_properties) {
1932 fprintf(stderr, " %s %" PRIu64" |",
1933 op_prop.first.c_str(), op_prop.second);
1934 }
1935 fprintf(stderr, "\n");
1936 }
1937 }
1938
ResetSineInterval()1939 void ResetSineInterval() {
1940 sine_interval_ = FLAGS_env->NowMicros();
1941 }
1942
GetSineInterval()1943 uint64_t GetSineInterval() {
1944 return sine_interval_;
1945 }
1946
GetStart()1947 uint64_t GetStart() {
1948 return start_;
1949 }
1950
ResetLastOpTime()1951 void ResetLastOpTime() {
1952 // Set to now to avoid latency from calls to SleepForMicroseconds
1953 last_op_finish_ = FLAGS_env->NowMicros();
1954 }
1955
FinishedOps(DBWithColumnFamilies * db_with_cfh,DB * db,int64_t num_ops,enum OperationType op_type=kOthers)1956 void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
1957 enum OperationType op_type = kOthers) {
1958 if (reporter_agent_) {
1959 reporter_agent_->ReportFinishedOps(num_ops);
1960 }
1961 if (FLAGS_histogram) {
1962 uint64_t now = FLAGS_env->NowMicros();
1963 uint64_t micros = now - last_op_finish_;
1964
1965 if (hist_.find(op_type) == hist_.end())
1966 {
1967 auto hist_temp = std::make_shared<HistogramImpl>();
1968 hist_.insert({op_type, std::move(hist_temp)});
1969 }
1970 hist_[op_type]->Add(micros);
1971
1972 if (micros > 20000 && !FLAGS_stats_interval) {
1973 fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
1974 fflush(stderr);
1975 }
1976 last_op_finish_ = now;
1977 }
1978
1979 done_ += num_ops;
1980 if (done_ >= next_report_) {
1981 if (!FLAGS_stats_interval) {
1982 if (next_report_ < 1000) next_report_ += 100;
1983 else if (next_report_ < 5000) next_report_ += 500;
1984 else if (next_report_ < 10000) next_report_ += 1000;
1985 else if (next_report_ < 50000) next_report_ += 5000;
1986 else if (next_report_ < 100000) next_report_ += 10000;
1987 else if (next_report_ < 500000) next_report_ += 50000;
1988 else next_report_ += 100000;
1989 fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
1990 } else {
1991 uint64_t now = FLAGS_env->NowMicros();
1992 int64_t usecs_since_last = now - last_report_finish_;
1993
1994 // Determine whether to print status where interval is either
1995 // each N operations or each N seconds.
1996
1997 if (FLAGS_stats_interval_seconds &&
1998 usecs_since_last < (FLAGS_stats_interval_seconds * 1000000)) {
1999 // Don't check again for this many operations
2000 next_report_ += FLAGS_stats_interval;
2001
2002 } else {
2003
2004 fprintf(stderr,
2005 "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
2006 "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
2007 FLAGS_env->TimeToString(now/1000000).c_str(),
2008 id_,
2009 done_ - last_report_done_, done_,
2010 (done_ - last_report_done_) /
2011 (usecs_since_last / 1000000.0),
2012 done_ / ((now - start_) / 1000000.0),
2013 (now - last_report_finish_) / 1000000.0,
2014 (now - start_) / 1000000.0);
2015
2016 if (id_ == 0 && FLAGS_stats_per_interval) {
2017 std::string stats;
2018
2019 if (db_with_cfh && db_with_cfh->num_created.load()) {
2020 for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) {
2021 if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
2022 &stats))
2023 fprintf(stderr, "%s\n", stats.c_str());
2024 if (FLAGS_show_table_properties) {
2025 for (int level = 0; level < FLAGS_num_levels; ++level) {
2026 if (db->GetProperty(
2027 db_with_cfh->cfh[i],
2028 "rocksdb.aggregated-table-properties-at-level" +
2029 ToString(level),
2030 &stats)) {
2031 if (stats.find("# entries=0") == std::string::npos) {
2032 fprintf(stderr, "Level[%d]: %s\n", level,
2033 stats.c_str());
2034 }
2035 }
2036 }
2037 }
2038 }
2039 } else if (db) {
2040 if (db->GetProperty("rocksdb.stats", &stats)) {
2041 fprintf(stderr, "%s\n", stats.c_str());
2042 }
2043 if (FLAGS_show_table_properties) {
2044 for (int level = 0; level < FLAGS_num_levels; ++level) {
2045 if (db->GetProperty(
2046 "rocksdb.aggregated-table-properties-at-level" +
2047 ToString(level),
2048 &stats)) {
2049 if (stats.find("# entries=0") == std::string::npos) {
2050 fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str());
2051 }
2052 }
2053 }
2054 }
2055 }
2056 }
2057
2058 next_report_ += FLAGS_stats_interval;
2059 last_report_finish_ = now;
2060 last_report_done_ = done_;
2061 }
2062 }
2063 if (id_ == 0 && FLAGS_thread_status_per_interval) {
2064 PrintThreadStatus();
2065 }
2066 fflush(stderr);
2067 }
2068 }
2069
AddBytes(int64_t n)2070 void AddBytes(int64_t n) {
2071 bytes_ += n;
2072 }
2073
Report(const Slice & name)2074 void Report(const Slice& name) {
2075 // Pretend at least one op was done in case we are running a benchmark
2076 // that does not call FinishedOps().
2077 if (done_ < 1) done_ = 1;
2078
2079 std::string extra;
2080 if (bytes_ > 0) {
2081 // Rate is computed on actual elapsed time, not the sum of per-thread
2082 // elapsed times.
2083 double elapsed = (finish_ - start_) * 1e-6;
2084 char rate[100];
2085 snprintf(rate, sizeof(rate), "%6.1f MB/s",
2086 (bytes_ / 1048576.0) / elapsed);
2087 extra = rate;
2088 }
2089 AppendWithSpace(&extra, message_);
2090 double elapsed = (finish_ - start_) * 1e-6;
2091 double throughput = (double)done_/elapsed;
2092
2093 fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
2094 name.ToString().c_str(),
2095 seconds_ * 1e6 / done_,
2096 (long)throughput,
2097 (extra.empty() ? "" : " "),
2098 extra.c_str());
2099 if (FLAGS_histogram) {
2100 for (auto it = hist_.begin(); it != hist_.end(); ++it) {
2101 fprintf(stdout, "Microseconds per %s:\n%s\n",
2102 OperationTypeString[it->first].c_str(),
2103 it->second->ToString().c_str());
2104 }
2105 }
2106 if (FLAGS_report_file_operations) {
2107 ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
2108 ReportFileOpCounters* counters = env->counters();
2109 fprintf(stdout, "Num files opened: %d\n",
2110 counters->open_counter_.load(std::memory_order_relaxed));
2111 fprintf(stdout, "Num Read(): %d\n",
2112 counters->read_counter_.load(std::memory_order_relaxed));
2113 fprintf(stdout, "Num Append(): %d\n",
2114 counters->append_counter_.load(std::memory_order_relaxed));
2115 fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
2116 counters->bytes_read_.load(std::memory_order_relaxed));
2117 fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
2118 counters->bytes_written_.load(std::memory_order_relaxed));
2119 env->reset();
2120 }
2121 fflush(stdout);
2122 }
2123 };
2124
2125 class CombinedStats {
2126 public:
AddStats(const Stats & stat)2127 void AddStats(const Stats& stat) {
2128 uint64_t total_ops = stat.done_;
2129 uint64_t total_bytes_ = stat.bytes_;
2130 double elapsed;
2131
2132 if (total_ops < 1) {
2133 total_ops = 1;
2134 }
2135
2136 elapsed = (stat.finish_ - stat.start_) * 1e-6;
2137 throughput_ops_.emplace_back(total_ops / elapsed);
2138
2139 if (total_bytes_ > 0) {
2140 double mbs = (total_bytes_ / 1048576.0);
2141 throughput_mbs_.emplace_back(mbs / elapsed);
2142 }
2143 }
2144
Report(const std::string & bench_name)2145 void Report(const std::string& bench_name) {
2146 const char* name = bench_name.c_str();
2147 int num_runs = static_cast<int>(throughput_ops_.size());
2148
2149 if (throughput_mbs_.size() == throughput_ops_.size()) {
2150 fprintf(stdout,
2151 "%s [AVG %d runs] : %d ops/sec; %6.1f MB/sec\n"
2152 "%s [MEDIAN %d runs] : %d ops/sec; %6.1f MB/sec\n",
2153 name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)),
2154 CalcAvg(throughput_mbs_), name, num_runs,
2155 static_cast<int>(CalcMedian(throughput_ops_)),
2156 CalcMedian(throughput_mbs_));
2157 } else {
2158 fprintf(stdout,
2159 "%s [AVG %d runs] : %d ops/sec\n"
2160 "%s [MEDIAN %d runs] : %d ops/sec\n",
2161 name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)), name,
2162 num_runs, static_cast<int>(CalcMedian(throughput_ops_)));
2163 }
2164 }
2165
2166 private:
CalcAvg(std::vector<double> data)2167 double CalcAvg(std::vector<double> data) {
2168 double avg = 0;
2169 for (double x : data) {
2170 avg += x;
2171 }
2172 avg = avg / data.size();
2173 return avg;
2174 }
2175
CalcMedian(std::vector<double> data)2176 double CalcMedian(std::vector<double> data) {
2177 assert(data.size() > 0);
2178 std::sort(data.begin(), data.end());
2179
2180 size_t mid = data.size() / 2;
2181 if (data.size() % 2 == 1) {
2182 // Odd number of entries
2183 return data[mid];
2184 } else {
2185 // Even number of entries
2186 return (data[mid] + data[mid - 1]) / 2;
2187 }
2188 }
2189
2190 std::vector<double> throughput_ops_;
2191 std::vector<double> throughput_mbs_;
2192 };
2193
2194 class TimestampEmulator {
2195 private:
2196 std::atomic<uint64_t> timestamp_;
2197
2198 public:
TimestampEmulator()2199 TimestampEmulator() : timestamp_(0) {}
Get() const2200 uint64_t Get() const { return timestamp_.load(); }
Inc()2201 void Inc() { timestamp_++; }
2202 };
2203
2204 // State shared by all concurrent executions of the same benchmark.
2205 struct SharedState {
2206 port::Mutex mu;
2207 port::CondVar cv;
2208 int total;
2209 int perf_level;
2210 std::shared_ptr<RateLimiter> write_rate_limiter;
2211 std::shared_ptr<RateLimiter> read_rate_limiter;
2212
2213 // Each thread goes through the following states:
2214 // (1) initializing
2215 // (2) waiting for others to be initialized
2216 // (3) running
2217 // (4) done
2218
2219 long num_initialized;
2220 long num_done;
2221 bool start;
2222
SharedStateROCKSDB_NAMESPACE::SharedState2223 SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
2224 };
2225
2226 // Per-thread state for concurrent executions of the same benchmark.
2227 struct ThreadState {
2228 int tid; // 0..n-1 when running in n threads
2229 Random64 rand; // Has different seeds for different threads
2230 Stats stats;
2231 SharedState* shared;
2232
ThreadStateROCKSDB_NAMESPACE::ThreadState2233 /* implicit */ ThreadState(int index)
2234 : tid(index),
2235 rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
2236 }
2237 };
2238
2239 class Duration {
2240 public:
Duration(uint64_t max_seconds,int64_t max_ops,int64_t ops_per_stage=0)2241 Duration(uint64_t max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) {
2242 max_seconds_ = max_seconds;
2243 max_ops_= max_ops;
2244 ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops;
2245 ops_ = 0;
2246 start_at_ = FLAGS_env->NowMicros();
2247 }
2248
GetStage()2249 int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; }
2250
Done(int64_t increment)2251 bool Done(int64_t increment) {
2252 if (increment <= 0) increment = 1; // avoid Done(0) and infinite loops
2253 ops_ += increment;
2254
2255 if (max_seconds_) {
2256 // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
2257 auto granularity = FLAGS_ops_between_duration_checks;
2258 if ((ops_ / granularity) != ((ops_ - increment) / granularity)) {
2259 uint64_t now = FLAGS_env->NowMicros();
2260 return ((now - start_at_) / 1000000) >= max_seconds_;
2261 } else {
2262 return false;
2263 }
2264 } else {
2265 return ops_ > max_ops_;
2266 }
2267 }
2268
2269 private:
2270 uint64_t max_seconds_;
2271 int64_t max_ops_;
2272 int64_t ops_per_stage_;
2273 int64_t ops_;
2274 uint64_t start_at_;
2275 };
2276
2277 class Benchmark {
2278 private:
2279 std::shared_ptr<Cache> cache_;
2280 std::shared_ptr<Cache> compressed_cache_;
2281 std::shared_ptr<const FilterPolicy> filter_policy_;
2282 const SliceTransform* prefix_extractor_;
2283 DBWithColumnFamilies db_;
2284 std::vector<DBWithColumnFamilies> multi_dbs_;
2285 int64_t num_;
2286 int key_size_;
2287 int prefix_size_;
2288 int64_t keys_per_prefix_;
2289 int64_t entries_per_batch_;
2290 int64_t writes_before_delete_range_;
2291 int64_t writes_per_range_tombstone_;
2292 int64_t range_tombstone_width_;
2293 int64_t max_num_range_tombstones_;
2294 WriteOptions write_options_;
2295 Options open_options_; // keep options around to properly destroy db later
2296 #ifndef ROCKSDB_LITE
2297 TraceOptions trace_options_;
2298 TraceOptions block_cache_trace_options_;
2299 #endif
2300 int64_t reads_;
2301 int64_t deletes_;
2302 double read_random_exp_range_;
2303 int64_t writes_;
2304 int64_t readwrites_;
2305 int64_t merge_keys_;
2306 bool report_file_operations_;
2307 bool use_blob_db_;
2308 std::vector<std::string> keys_;
2309
2310 class ErrorHandlerListener : public EventListener {
2311 public:
2312 #ifndef ROCKSDB_LITE
ErrorHandlerListener()2313 ErrorHandlerListener()
2314 : mutex_(),
2315 cv_(&mutex_),
2316 no_auto_recovery_(false),
2317 recovery_complete_(false) {}
2318
~ErrorHandlerListener()2319 ~ErrorHandlerListener() override {}
2320
OnErrorRecoveryBegin(BackgroundErrorReason,Status,bool * auto_recovery)2321 void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
2322 Status /*bg_error*/,
2323 bool* auto_recovery) override {
2324 if (*auto_recovery && no_auto_recovery_) {
2325 *auto_recovery = false;
2326 }
2327 }
2328
OnErrorRecoveryCompleted(Status)2329 void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
2330 InstrumentedMutexLock l(&mutex_);
2331 recovery_complete_ = true;
2332 cv_.SignalAll();
2333 }
2334
WaitForRecovery(uint64_t abs_time_us)2335 bool WaitForRecovery(uint64_t abs_time_us) {
2336 InstrumentedMutexLock l(&mutex_);
2337 if (!recovery_complete_) {
2338 cv_.TimedWait(abs_time_us);
2339 }
2340 if (recovery_complete_) {
2341 recovery_complete_ = false;
2342 return true;
2343 }
2344 return false;
2345 }
2346
EnableAutoRecovery(bool enable=true)2347 void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
2348
2349 private:
2350 InstrumentedMutex mutex_;
2351 InstrumentedCondVar cv_;
2352 bool no_auto_recovery_;
2353 bool recovery_complete_;
2354 #else // ROCKSDB_LITE
2355 bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
2356 void EnableAutoRecovery(bool /*enable*/) {}
2357 #endif // ROCKSDB_LITE
2358 };
2359
2360 std::shared_ptr<ErrorHandlerListener> listener_;
2361
SanityCheck()2362 bool SanityCheck() {
2363 if (FLAGS_compression_ratio > 1) {
2364 fprintf(stderr, "compression_ratio should be between 0 and 1\n");
2365 return false;
2366 }
2367 return true;
2368 }
2369
CompressSlice(const CompressionInfo & compression_info,const Slice & input,std::string * compressed)2370 inline bool CompressSlice(const CompressionInfo& compression_info,
2371 const Slice& input, std::string* compressed) {
2372 bool ok = true;
2373 switch (FLAGS_compression_type_e) {
2374 case ROCKSDB_NAMESPACE::kSnappyCompression:
2375 ok = Snappy_Compress(compression_info, input.data(), input.size(),
2376 compressed);
2377 break;
2378 case ROCKSDB_NAMESPACE::kZlibCompression:
2379 ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
2380 compressed);
2381 break;
2382 case ROCKSDB_NAMESPACE::kBZip2Compression:
2383 ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
2384 compressed);
2385 break;
2386 case ROCKSDB_NAMESPACE::kLZ4Compression:
2387 ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
2388 compressed);
2389 break;
2390 case ROCKSDB_NAMESPACE::kLZ4HCCompression:
2391 ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
2392 compressed);
2393 break;
2394 case ROCKSDB_NAMESPACE::kXpressCompression:
2395 ok = XPRESS_Compress(input.data(),
2396 input.size(), compressed);
2397 break;
2398 case ROCKSDB_NAMESPACE::kZSTD:
2399 ok = ZSTD_Compress(compression_info, input.data(), input.size(),
2400 compressed);
2401 break;
2402 default:
2403 ok = false;
2404 }
2405 return ok;
2406 }
2407
PrintHeader()2408 void PrintHeader() {
2409 PrintEnvironment();
2410 fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
2411 auto avg_value_size = FLAGS_value_size;
2412 if (FLAGS_value_size_distribution_type_e == kFixed) {
2413 fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
2414 avg_value_size,
2415 static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
2416 } else {
2417 avg_value_size = (FLAGS_value_size_min + FLAGS_value_size_max) / 2;
2418 fprintf(stdout, "Values: %d avg bytes each (%d bytes after compression)\n",
2419 avg_value_size,
2420 static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
2421 fprintf(stdout, "Values Distribution: %s (min: %d, max: %d)\n",
2422 FLAGS_value_size_distribution_type.c_str(),
2423 FLAGS_value_size_min, FLAGS_value_size_max);
2424 }
2425 fprintf(stdout, "Entries: %" PRIu64 "\n", num_);
2426 fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size);
2427 fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_);
2428 fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
2429 ((static_cast<int64_t>(FLAGS_key_size + avg_value_size) * num_)
2430 / 1048576.0));
2431 fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
2432 (((FLAGS_key_size + avg_value_size * FLAGS_compression_ratio)
2433 * num_)
2434 / 1048576.0));
2435 fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
2436 FLAGS_benchmark_write_rate_limit);
2437 fprintf(stdout, "Read rate: %" PRIu64 " ops/second\n",
2438 FLAGS_benchmark_read_rate_limit);
2439 if (FLAGS_enable_numa) {
2440 fprintf(stderr, "Running in NUMA enabled mode.\n");
2441 #ifndef NUMA
2442 fprintf(stderr, "NUMA is not defined in the system.\n");
2443 exit(1);
2444 #else
2445 if (numa_available() == -1) {
2446 fprintf(stderr, "NUMA is not supported by the system.\n");
2447 exit(1);
2448 }
2449 #endif
2450 }
2451
2452 auto compression = CompressionTypeToString(FLAGS_compression_type_e);
2453 fprintf(stdout, "Compression: %s\n", compression.c_str());
2454 fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
2455 FLAGS_sample_for_compression);
2456
2457 switch (FLAGS_rep_factory) {
2458 case kPrefixHash:
2459 fprintf(stdout, "Memtablerep: prefix_hash\n");
2460 break;
2461 case kSkipList:
2462 fprintf(stdout, "Memtablerep: skip_list\n");
2463 break;
2464 case kVectorRep:
2465 fprintf(stdout, "Memtablerep: vector\n");
2466 break;
2467 case kHashLinkedList:
2468 fprintf(stdout, "Memtablerep: hash_linkedlist\n");
2469 break;
2470 }
2471 fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
2472
2473 PrintWarnings(compression.c_str());
2474 fprintf(stdout, "------------------------------------------------\n");
2475 }
2476
PrintWarnings(const char * compression)2477 void PrintWarnings(const char* compression) {
2478 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
2479 fprintf(stdout,
2480 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
2481 );
2482 #endif
2483 #ifndef NDEBUG
2484 fprintf(stdout,
2485 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
2486 #endif
2487 if (FLAGS_compression_type_e != ROCKSDB_NAMESPACE::kNoCompression) {
2488 // The test string should not be too small.
2489 const int len = FLAGS_block_size;
2490 std::string input_str(len, 'y');
2491 std::string compressed;
2492 CompressionOptions opts;
2493 CompressionContext context(FLAGS_compression_type_e);
2494 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
2495 FLAGS_compression_type_e,
2496 FLAGS_sample_for_compression);
2497 bool result = CompressSlice(info, Slice(input_str), &compressed);
2498
2499 if (!result) {
2500 fprintf(stdout, "WARNING: %s compression is not enabled\n",
2501 compression);
2502 } else if (compressed.size() >= input_str.size()) {
2503 fprintf(stdout, "WARNING: %s compression is not effective\n",
2504 compression);
2505 }
2506 }
2507 }
2508
2509 // Current the following isn't equivalent to OS_LINUX.
2510 #if defined(__linux)
TrimSpace(Slice s)2511 static Slice TrimSpace(Slice s) {
2512 unsigned int start = 0;
2513 while (start < s.size() && isspace(s[start])) {
2514 start++;
2515 }
2516 unsigned int limit = static_cast<unsigned int>(s.size());
2517 while (limit > start && isspace(s[limit-1])) {
2518 limit--;
2519 }
2520 return Slice(s.data() + start, limit - start);
2521 }
2522 #endif
2523
PrintEnvironment()2524 void PrintEnvironment() {
2525 fprintf(stderr, "RocksDB: version %d.%d\n",
2526 kMajorVersion, kMinorVersion);
2527
2528 #if defined(__linux)
2529 time_t now = time(nullptr);
2530 char buf[52];
2531 // Lint complains about ctime() usage, so replace it with ctime_r(). The
2532 // requirement is to provide a buffer which is at least 26 bytes.
2533 fprintf(stderr, "Date: %s",
2534 ctime_r(&now, buf)); // ctime_r() adds newline
2535
2536 FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
2537 if (cpuinfo != nullptr) {
2538 char line[1000];
2539 int num_cpus = 0;
2540 std::string cpu_type;
2541 std::string cache_size;
2542 while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
2543 const char* sep = strchr(line, ':');
2544 if (sep == nullptr) {
2545 continue;
2546 }
2547 Slice key = TrimSpace(Slice(line, sep - 1 - line));
2548 Slice val = TrimSpace(Slice(sep + 1));
2549 if (key == "model name") {
2550 ++num_cpus;
2551 cpu_type = val.ToString();
2552 } else if (key == "cache size") {
2553 cache_size = val.ToString();
2554 }
2555 }
2556 fclose(cpuinfo);
2557 fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
2558 fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
2559 }
2560 #endif
2561 }
2562
KeyExpired(const TimestampEmulator * timestamp_emulator,const Slice & key)2563 static bool KeyExpired(const TimestampEmulator* timestamp_emulator,
2564 const Slice& key) {
2565 const char* pos = key.data();
2566 pos += 8;
2567 uint64_t timestamp = 0;
2568 if (port::kLittleEndian) {
2569 int bytes_to_fill = 8;
2570 for (int i = 0; i < bytes_to_fill; ++i) {
2571 timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
2572 << ((bytes_to_fill - i - 1) << 3));
2573 }
2574 } else {
2575 memcpy(×tamp, pos, sizeof(timestamp));
2576 }
2577 return timestamp_emulator->Get() - timestamp > FLAGS_time_range;
2578 }
2579
2580 class ExpiredTimeFilter : public CompactionFilter {
2581 public:
ExpiredTimeFilter(const std::shared_ptr<TimestampEmulator> & timestamp_emulator)2582 explicit ExpiredTimeFilter(
2583 const std::shared_ptr<TimestampEmulator>& timestamp_emulator)
2584 : timestamp_emulator_(timestamp_emulator) {}
Filter(int,const Slice & key,const Slice &,std::string *,bool *) const2585 bool Filter(int /*level*/, const Slice& key,
2586 const Slice& /*existing_value*/, std::string* /*new_value*/,
2587 bool* /*value_changed*/) const override {
2588 return KeyExpired(timestamp_emulator_.get(), key);
2589 }
Name() const2590 const char* Name() const override { return "ExpiredTimeFilter"; }
2591
2592 private:
2593 std::shared_ptr<TimestampEmulator> timestamp_emulator_;
2594 };
2595
2596 class KeepFilter : public CompactionFilter {
2597 public:
Filter(int,const Slice &,const Slice &,std::string *,bool *) const2598 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
2599 std::string* /*new_value*/,
2600 bool* /*value_changed*/) const override {
2601 return false;
2602 }
2603
Name() const2604 const char* Name() const override { return "KeepFilter"; }
2605 };
2606
NewCache(int64_t capacity)2607 std::shared_ptr<Cache> NewCache(int64_t capacity) {
2608 if (capacity <= 0) {
2609 return nullptr;
2610 }
2611 if (FLAGS_use_clock_cache) {
2612 auto cache = NewClockCache(static_cast<size_t>(capacity),
2613 FLAGS_cache_numshardbits);
2614 if (!cache) {
2615 fprintf(stderr, "Clock cache not supported.");
2616 exit(1);
2617 }
2618 return cache;
2619 } else {
2620 return NewLRUCache(
2621 static_cast<size_t>(capacity), FLAGS_cache_numshardbits,
2622 false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio);
2623 }
2624 }
2625
2626 public:
Benchmark()2627 Benchmark()
2628 : cache_(NewCache(FLAGS_cache_size)),
2629 compressed_cache_(NewCache(FLAGS_compressed_cache_size)),
2630 filter_policy_(FLAGS_bloom_bits >= 0
2631 ? NewBloomFilterPolicy(FLAGS_bloom_bits,
2632 FLAGS_use_block_based_filter)
2633 : nullptr),
2634 prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
2635 num_(FLAGS_num),
2636 key_size_(FLAGS_key_size),
2637 prefix_size_(FLAGS_prefix_size),
2638 keys_per_prefix_(FLAGS_keys_per_prefix),
2639 entries_per_batch_(1),
2640 reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
2641 read_random_exp_range_(0.0),
2642 writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
2643 readwrites_(
2644 (FLAGS_writes < 0 && FLAGS_reads < 0)
2645 ? FLAGS_num
2646 : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)),
2647 merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
2648 report_file_operations_(FLAGS_report_file_operations),
2649 #ifndef ROCKSDB_LITE
2650 use_blob_db_(FLAGS_use_blob_db)
2651 #else
2652 use_blob_db_(false)
2653 #endif // !ROCKSDB_LITE
2654 {
2655 // use simcache instead of cache
2656 if (FLAGS_simcache_size >= 0) {
2657 if (FLAGS_cache_numshardbits >= 1) {
2658 cache_ =
2659 NewSimCache(cache_, FLAGS_simcache_size, FLAGS_cache_numshardbits);
2660 } else {
2661 cache_ = NewSimCache(cache_, FLAGS_simcache_size, 0);
2662 }
2663 }
2664
2665 if (report_file_operations_) {
2666 if (!FLAGS_hdfs.empty()) {
2667 fprintf(stderr,
2668 "--hdfs and --report_file_operations cannot be enabled "
2669 "at the same time");
2670 exit(1);
2671 }
2672 FLAGS_env = new ReportFileOpEnv(FLAGS_env);
2673 }
2674
2675 if (FLAGS_prefix_size > FLAGS_key_size) {
2676 fprintf(stderr, "prefix size is larger than key size");
2677 exit(1);
2678 }
2679
2680 std::vector<std::string> files;
2681 FLAGS_env->GetChildren(FLAGS_db, &files);
2682 for (size_t i = 0; i < files.size(); i++) {
2683 if (Slice(files[i]).starts_with("heap-")) {
2684 FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
2685 }
2686 }
2687 if (!FLAGS_use_existing_db) {
2688 Options options;
2689 options.env = FLAGS_env;
2690 if (!FLAGS_wal_dir.empty()) {
2691 options.wal_dir = FLAGS_wal_dir;
2692 }
2693 #ifndef ROCKSDB_LITE
2694 if (use_blob_db_) {
2695 blob_db::DestroyBlobDB(FLAGS_db, options, blob_db::BlobDBOptions());
2696 }
2697 #endif // !ROCKSDB_LITE
2698 DestroyDB(FLAGS_db, options);
2699 if (!FLAGS_wal_dir.empty()) {
2700 FLAGS_env->DeleteDir(FLAGS_wal_dir);
2701 }
2702
2703 if (FLAGS_num_multi_db > 1) {
2704 FLAGS_env->CreateDir(FLAGS_db);
2705 if (!FLAGS_wal_dir.empty()) {
2706 FLAGS_env->CreateDir(FLAGS_wal_dir);
2707 }
2708 }
2709 }
2710
2711 listener_.reset(new ErrorHandlerListener());
2712 }
2713
~Benchmark()2714 ~Benchmark() {
2715 db_.DeleteDBs();
2716 delete prefix_extractor_;
2717 if (cache_.get() != nullptr) {
2718 // this will leak, but we're shutting down so nobody cares
2719 cache_->DisownData();
2720 }
2721 }
2722
AllocateKey(std::unique_ptr<const char[]> * key_guard)2723 Slice AllocateKey(std::unique_ptr<const char[]>* key_guard) {
2724 char* data = new char[key_size_];
2725 const char* const_data = data;
2726 key_guard->reset(const_data);
2727 return Slice(key_guard->get(), key_size_);
2728 }
2729
2730 // Generate key according to the given specification and random number.
2731 // The resulting key will have the following format (if keys_per_prefix_
2732 // is positive), extra trailing bytes are either cut off or padded with '0'.
2733 // The prefix value is derived from key value.
2734 // ----------------------------
2735 // | prefix 00000 | key 00000 |
2736 // ----------------------------
2737 // If keys_per_prefix_ is 0, the key is simply a binary representation of
2738 // random number followed by trailing '0's
2739 // ----------------------------
2740 // | key 00000 |
2741 // ----------------------------
GenerateKeyFromInt(uint64_t v,int64_t num_keys,Slice * key)2742 void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
2743 if (!keys_.empty()) {
2744 assert(FLAGS_use_existing_keys);
2745 assert(keys_.size() == static_cast<size_t>(num_keys));
2746 assert(v < static_cast<uint64_t>(num_keys));
2747 *key = keys_[v];
2748 return;
2749 }
2750 char* start = const_cast<char*>(key->data());
2751 char* pos = start;
2752 if (keys_per_prefix_ > 0) {
2753 int64_t num_prefix = num_keys / keys_per_prefix_;
2754 int64_t prefix = v % num_prefix;
2755 int bytes_to_fill = std::min(prefix_size_, 8);
2756 if (port::kLittleEndian) {
2757 for (int i = 0; i < bytes_to_fill; ++i) {
2758 pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
2759 }
2760 } else {
2761 memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
2762 }
2763 if (prefix_size_ > 8) {
2764 // fill the rest with 0s
2765 memset(pos + 8, '0', prefix_size_ - 8);
2766 }
2767 pos += prefix_size_;
2768 }
2769
2770 int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
2771 if (port::kLittleEndian) {
2772 for (int i = 0; i < bytes_to_fill; ++i) {
2773 pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
2774 }
2775 } else {
2776 memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
2777 }
2778 pos += bytes_to_fill;
2779 if (key_size_ > pos - start) {
2780 memset(pos, '0', key_size_ - (pos - start));
2781 }
2782 }
2783
GenerateKeyFromIntForSeek(uint64_t v,int64_t num_keys,Slice * key)2784 void GenerateKeyFromIntForSeek(uint64_t v, int64_t num_keys, Slice* key) {
2785 GenerateKeyFromInt(v, num_keys, key);
2786 if (FLAGS_seek_missing_prefix) {
2787 assert(prefix_size_ > 8);
2788 char* key_ptr = const_cast<char*>(key->data());
2789 // This rely on GenerateKeyFromInt filling paddings with '0's.
2790 // Putting a '1' will create a non-existing prefix.
2791 key_ptr[8] = '1';
2792 }
2793 }
2794
GetPathForMultiple(std::string base_name,size_t id)2795 std::string GetPathForMultiple(std::string base_name, size_t id) {
2796 if (!base_name.empty()) {
2797 #ifndef OS_WIN
2798 if (base_name.back() != '/') {
2799 base_name += '/';
2800 }
2801 #else
2802 if (base_name.back() != '\\') {
2803 base_name += '\\';
2804 }
2805 #endif
2806 }
2807 return base_name + ToString(id);
2808 }
2809
VerifyDBFromDB(std::string & truth_db_name)2810 void VerifyDBFromDB(std::string& truth_db_name) {
2811 DBWithColumnFamilies truth_db;
2812 auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
2813 if (!s.ok()) {
2814 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2815 exit(1);
2816 }
2817 ReadOptions ro;
2818 ro.total_order_seek = true;
2819 std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
2820 std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
2821 // Verify that all the key/values in truth_db are retrivable in db with
2822 // ::Get
2823 fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
2824 for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
2825 std::string value;
2826 s = db_.db->Get(ro, truth_iter->key(), &value);
2827 assert(s.ok());
2828 // TODO(myabandeh): provide debugging hints
2829 assert(Slice(value) == truth_iter->value());
2830 }
2831 // Verify that the db iterator does not give any extra key/value
2832 fprintf(stderr, "Verifying db == truth_db...\n");
2833 for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid();
2834 db_iter->Next(), truth_iter->Next()) {
2835 assert(truth_iter->Valid());
2836 assert(truth_iter->value() == db_iter->value());
2837 }
2838 // No more key should be left unchecked in truth_db
2839 assert(!truth_iter->Valid());
2840 fprintf(stderr, "...Verified\n");
2841 }
2842
Run()2843 void Run() {
2844 if (!SanityCheck()) {
2845 exit(1);
2846 }
2847 Open(&open_options_);
2848 PrintHeader();
2849 std::stringstream benchmark_stream(FLAGS_benchmarks);
2850 std::string name;
2851 std::unique_ptr<ExpiredTimeFilter> filter;
2852 while (std::getline(benchmark_stream, name, ',')) {
2853 // Sanitize parameters
2854 num_ = FLAGS_num;
2855 reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
2856 writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
2857 deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
2858 value_size = FLAGS_value_size;
2859 key_size_ = FLAGS_key_size;
2860 entries_per_batch_ = FLAGS_batch_size;
2861 writes_before_delete_range_ = FLAGS_writes_before_delete_range;
2862 writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
2863 range_tombstone_width_ = FLAGS_range_tombstone_width;
2864 max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
2865 write_options_ = WriteOptions();
2866 read_random_exp_range_ = FLAGS_read_random_exp_range;
2867 if (FLAGS_sync) {
2868 write_options_.sync = true;
2869 }
2870 write_options_.disableWAL = FLAGS_disable_wal;
2871
2872 void (Benchmark::*method)(ThreadState*) = nullptr;
2873 void (Benchmark::*post_process_method)() = nullptr;
2874
2875 bool fresh_db = false;
2876 int num_threads = FLAGS_threads;
2877
2878 int num_repeat = 1;
2879 int num_warmup = 0;
2880 if (!name.empty() && *name.rbegin() == ']') {
2881 auto it = name.find('[');
2882 if (it == std::string::npos) {
2883 fprintf(stderr, "unknown benchmark arguments '%s'\n", name.c_str());
2884 exit(1);
2885 }
2886 std::string args = name.substr(it + 1);
2887 args.resize(args.size() - 1);
2888 name.resize(it);
2889
2890 std::string bench_arg;
2891 std::stringstream args_stream(args);
2892 while (std::getline(args_stream, bench_arg, '-')) {
2893 if (bench_arg.empty()) {
2894 continue;
2895 }
2896 if (bench_arg[0] == 'X') {
2897 // Repeat the benchmark n times
2898 std::string num_str = bench_arg.substr(1);
2899 num_repeat = std::stoi(num_str);
2900 } else if (bench_arg[0] == 'W') {
2901 // Warm up the benchmark for n times
2902 std::string num_str = bench_arg.substr(1);
2903 num_warmup = std::stoi(num_str);
2904 }
2905 }
2906 }
2907
2908 // Both fillseqdeterministic and filluniquerandomdeterministic
2909 // fill the levels except the max level with UNIQUE_RANDOM
2910 // and fill the max level with fillseq and filluniquerandom, respectively
2911 if (name == "fillseqdeterministic" ||
2912 name == "filluniquerandomdeterministic") {
2913 if (!FLAGS_disable_auto_compactions) {
2914 fprintf(stderr,
2915 "Please disable_auto_compactions in FillDeterministic "
2916 "benchmark\n");
2917 exit(1);
2918 }
2919 if (num_threads > 1) {
2920 fprintf(stderr,
2921 "filldeterministic multithreaded not supported"
2922 ", use 1 thread\n");
2923 num_threads = 1;
2924 }
2925 fresh_db = true;
2926 if (name == "fillseqdeterministic") {
2927 method = &Benchmark::WriteSeqDeterministic;
2928 } else {
2929 method = &Benchmark::WriteUniqueRandomDeterministic;
2930 }
2931 } else if (name == "fillseq") {
2932 fresh_db = true;
2933 method = &Benchmark::WriteSeq;
2934 } else if (name == "fillbatch") {
2935 fresh_db = true;
2936 entries_per_batch_ = 1000;
2937 method = &Benchmark::WriteSeq;
2938 } else if (name == "fillrandom") {
2939 fresh_db = true;
2940 method = &Benchmark::WriteRandom;
2941 } else if (name == "filluniquerandom") {
2942 fresh_db = true;
2943 if (num_threads > 1) {
2944 fprintf(stderr,
2945 "filluniquerandom multithreaded not supported"
2946 ", use 1 thread");
2947 num_threads = 1;
2948 }
2949 method = &Benchmark::WriteUniqueRandom;
2950 } else if (name == "overwrite") {
2951 method = &Benchmark::WriteRandom;
2952 } else if (name == "fillsync") {
2953 fresh_db = true;
2954 num_ /= 1000;
2955 write_options_.sync = true;
2956 method = &Benchmark::WriteRandom;
2957 } else if (name == "fill100K") {
2958 fresh_db = true;
2959 num_ /= 1000;
2960 value_size = 100 * 1000;
2961 method = &Benchmark::WriteRandom;
2962 } else if (name == "readseq") {
2963 method = &Benchmark::ReadSequential;
2964 } else if (name == "readtorowcache") {
2965 if (!FLAGS_use_existing_keys || !FLAGS_row_cache_size) {
2966 fprintf(stderr,
2967 "Please set use_existing_keys to true and specify a "
2968 "row cache size in readtorowcache benchmark\n");
2969 exit(1);
2970 }
2971 method = &Benchmark::ReadToRowCache;
2972 } else if (name == "readtocache") {
2973 method = &Benchmark::ReadSequential;
2974 num_threads = 1;
2975 reads_ = num_;
2976 } else if (name == "readreverse") {
2977 method = &Benchmark::ReadReverse;
2978 } else if (name == "readrandom") {
2979 if (FLAGS_multiread_stride) {
2980 fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
2981 entries_per_batch_);
2982 }
2983 method = &Benchmark::ReadRandom;
2984 } else if (name == "readrandomfast") {
2985 method = &Benchmark::ReadRandomFast;
2986 } else if (name == "multireadrandom") {
2987 fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
2988 entries_per_batch_);
2989 method = &Benchmark::MultiReadRandom;
2990 } else if (name == "mixgraph") {
2991 method = &Benchmark::MixGraph;
2992 } else if (name == "readmissing") {
2993 ++key_size_;
2994 method = &Benchmark::ReadRandom;
2995 } else if (name == "newiterator") {
2996 method = &Benchmark::IteratorCreation;
2997 } else if (name == "newiteratorwhilewriting") {
2998 num_threads++; // Add extra thread for writing
2999 method = &Benchmark::IteratorCreationWhileWriting;
3000 } else if (name == "seekrandom") {
3001 method = &Benchmark::SeekRandom;
3002 } else if (name == "seekrandomwhilewriting") {
3003 num_threads++; // Add extra thread for writing
3004 method = &Benchmark::SeekRandomWhileWriting;
3005 } else if (name == "seekrandomwhilemerging") {
3006 num_threads++; // Add extra thread for merging
3007 method = &Benchmark::SeekRandomWhileMerging;
3008 } else if (name == "readrandomsmall") {
3009 reads_ /= 1000;
3010 method = &Benchmark::ReadRandom;
3011 } else if (name == "deleteseq") {
3012 method = &Benchmark::DeleteSeq;
3013 } else if (name == "deleterandom") {
3014 method = &Benchmark::DeleteRandom;
3015 } else if (name == "readwhilewriting") {
3016 num_threads++; // Add extra thread for writing
3017 method = &Benchmark::ReadWhileWriting;
3018 } else if (name == "readwhilemerging") {
3019 num_threads++; // Add extra thread for writing
3020 method = &Benchmark::ReadWhileMerging;
3021 } else if (name == "readwhilescanning") {
3022 num_threads++; // Add extra thread for scaning
3023 method = &Benchmark::ReadWhileScanning;
3024 } else if (name == "readrandomwriterandom") {
3025 method = &Benchmark::ReadRandomWriteRandom;
3026 } else if (name == "readrandommergerandom") {
3027 if (FLAGS_merge_operator.empty()) {
3028 fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
3029 name.c_str());
3030 exit(1);
3031 }
3032 method = &Benchmark::ReadRandomMergeRandom;
3033 } else if (name == "updaterandom") {
3034 method = &Benchmark::UpdateRandom;
3035 } else if (name == "xorupdaterandom") {
3036 method = &Benchmark::XORUpdateRandom;
3037 } else if (name == "appendrandom") {
3038 method = &Benchmark::AppendRandom;
3039 } else if (name == "mergerandom") {
3040 if (FLAGS_merge_operator.empty()) {
3041 fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
3042 name.c_str());
3043 exit(1);
3044 }
3045 method = &Benchmark::MergeRandom;
3046 } else if (name == "randomwithverify") {
3047 method = &Benchmark::RandomWithVerify;
3048 } else if (name == "fillseekseq") {
3049 method = &Benchmark::WriteSeqSeekSeq;
3050 } else if (name == "compact") {
3051 method = &Benchmark::Compact;
3052 } else if (name == "compactall") {
3053 CompactAll();
3054 } else if (name == "crc32c") {
3055 method = &Benchmark::Crc32c;
3056 } else if (name == "xxhash") {
3057 method = &Benchmark::xxHash;
3058 } else if (name == "acquireload") {
3059 method = &Benchmark::AcquireLoad;
3060 } else if (name == "compress") {
3061 method = &Benchmark::Compress;
3062 } else if (name == "uncompress") {
3063 method = &Benchmark::Uncompress;
3064 #ifndef ROCKSDB_LITE
3065 } else if (name == "randomtransaction") {
3066 method = &Benchmark::RandomTransaction;
3067 post_process_method = &Benchmark::RandomTransactionVerify;
3068 #endif // ROCKSDB_LITE
3069 } else if (name == "randomreplacekeys") {
3070 fresh_db = true;
3071 method = &Benchmark::RandomReplaceKeys;
3072 } else if (name == "timeseries") {
3073 timestamp_emulator_.reset(new TimestampEmulator());
3074 if (FLAGS_expire_style == "compaction_filter") {
3075 filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
3076 fprintf(stdout, "Compaction filter is used to remove expired data");
3077 open_options_.compaction_filter = filter.get();
3078 }
3079 fresh_db = true;
3080 method = &Benchmark::TimeSeries;
3081 } else if (name == "stats") {
3082 PrintStats("rocksdb.stats");
3083 } else if (name == "resetstats") {
3084 ResetStats();
3085 } else if (name == "verify") {
3086 VerifyDBFromDB(FLAGS_truth_db);
3087 } else if (name == "levelstats") {
3088 PrintStats("rocksdb.levelstats");
3089 } else if (name == "sstables") {
3090 PrintStats("rocksdb.sstables");
3091 } else if (name == "stats_history") {
3092 PrintStatsHistory();
3093 } else if (name == "replay") {
3094 if (num_threads > 1) {
3095 fprintf(stderr, "Multi-threaded replay is not yet supported\n");
3096 exit(1);
3097 }
3098 if (FLAGS_trace_file == "") {
3099 fprintf(stderr, "Please set --trace_file to be replayed from\n");
3100 exit(1);
3101 }
3102 method = &Benchmark::Replay;
3103 } else if (name == "getmergeoperands") {
3104 method = &Benchmark::GetMergeOperands;
3105 } else if (!name.empty()) { // No error message for empty name
3106 fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
3107 exit(1);
3108 }
3109
3110 if (fresh_db) {
3111 if (FLAGS_use_existing_db) {
3112 fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
3113 name.c_str());
3114 method = nullptr;
3115 } else {
3116 if (db_.db != nullptr) {
3117 db_.DeleteDBs();
3118 DestroyDB(FLAGS_db, open_options_);
3119 }
3120 Options options = open_options_;
3121 for (size_t i = 0; i < multi_dbs_.size(); i++) {
3122 delete multi_dbs_[i].db;
3123 if (!open_options_.wal_dir.empty()) {
3124 options.wal_dir = GetPathForMultiple(open_options_.wal_dir, i);
3125 }
3126 DestroyDB(GetPathForMultiple(FLAGS_db, i), options);
3127 }
3128 multi_dbs_.clear();
3129 }
3130 Open(&open_options_); // use open_options for the last accessed
3131 }
3132
3133 if (method != nullptr) {
3134 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
3135
3136 #ifndef ROCKSDB_LITE
3137 // A trace_file option can be provided both for trace and replay
3138 // operations. But db_bench does not support tracing and replaying at
3139 // the same time, for now. So, start tracing only when it is not a
3140 // replay.
3141 if (FLAGS_trace_file != "" && name != "replay") {
3142 std::unique_ptr<TraceWriter> trace_writer;
3143 Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
3144 FLAGS_trace_file, &trace_writer);
3145 if (!s.ok()) {
3146 fprintf(stderr, "Encountered an error starting a trace, %s\n",
3147 s.ToString().c_str());
3148 exit(1);
3149 }
3150 s = db_.db->StartTrace(trace_options_, std::move(trace_writer));
3151 if (!s.ok()) {
3152 fprintf(stderr, "Encountered an error starting a trace, %s\n",
3153 s.ToString().c_str());
3154 exit(1);
3155 }
3156 fprintf(stdout, "Tracing the workload to: [%s]\n",
3157 FLAGS_trace_file.c_str());
3158 }
3159 // Start block cache tracing.
3160 if (!FLAGS_block_cache_trace_file.empty()) {
3161 // Sanity checks.
3162 if (FLAGS_block_cache_trace_sampling_frequency <= 0) {
3163 fprintf(stderr,
3164 "Block cache trace sampling frequency must be higher than "
3165 "0.\n");
3166 exit(1);
3167 }
3168 if (FLAGS_block_cache_trace_max_trace_file_size_in_bytes <= 0) {
3169 fprintf(stderr,
3170 "The maximum file size for block cache tracing must be "
3171 "higher than 0.\n");
3172 exit(1);
3173 }
3174 block_cache_trace_options_.max_trace_file_size =
3175 FLAGS_block_cache_trace_max_trace_file_size_in_bytes;
3176 block_cache_trace_options_.sampling_frequency =
3177 FLAGS_block_cache_trace_sampling_frequency;
3178 std::unique_ptr<TraceWriter> block_cache_trace_writer;
3179 Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
3180 FLAGS_block_cache_trace_file,
3181 &block_cache_trace_writer);
3182 if (!s.ok()) {
3183 fprintf(stderr,
3184 "Encountered an error when creating trace writer, %s\n",
3185 s.ToString().c_str());
3186 exit(1);
3187 }
3188 s = db_.db->StartBlockCacheTrace(block_cache_trace_options_,
3189 std::move(block_cache_trace_writer));
3190 if (!s.ok()) {
3191 fprintf(
3192 stderr,
3193 "Encountered an error when starting block cache tracing, %s\n",
3194 s.ToString().c_str());
3195 exit(1);
3196 }
3197 fprintf(stdout, "Tracing block cache accesses to: [%s]\n",
3198 FLAGS_block_cache_trace_file.c_str());
3199 }
3200 #endif // ROCKSDB_LITE
3201
3202 if (num_warmup > 0) {
3203 printf("Warming up benchmark by running %d times\n", num_warmup);
3204 }
3205
3206 for (int i = 0; i < num_warmup; i++) {
3207 RunBenchmark(num_threads, name, method);
3208 }
3209
3210 if (num_repeat > 1) {
3211 printf("Running benchmark for %d times\n", num_repeat);
3212 }
3213
3214 CombinedStats combined_stats;
3215 for (int i = 0; i < num_repeat; i++) {
3216 Stats stats = RunBenchmark(num_threads, name, method);
3217 combined_stats.AddStats(stats);
3218 }
3219 if (num_repeat > 1) {
3220 combined_stats.Report(name);
3221 }
3222 }
3223 if (post_process_method != nullptr) {
3224 (this->*post_process_method)();
3225 }
3226 }
3227
3228 if (secondary_update_thread_) {
3229 secondary_update_stopped_.store(1, std::memory_order_relaxed);
3230 secondary_update_thread_->join();
3231 secondary_update_thread_.reset();
3232 }
3233
3234 #ifndef ROCKSDB_LITE
3235 if (name != "replay" && FLAGS_trace_file != "") {
3236 Status s = db_.db->EndTrace();
3237 if (!s.ok()) {
3238 fprintf(stderr, "Encountered an error ending the trace, %s\n",
3239 s.ToString().c_str());
3240 }
3241 }
3242 if (!FLAGS_block_cache_trace_file.empty()) {
3243 Status s = db_.db->EndBlockCacheTrace();
3244 if (!s.ok()) {
3245 fprintf(stderr,
3246 "Encountered an error ending the block cache tracing, %s\n",
3247 s.ToString().c_str());
3248 }
3249 }
3250 #endif // ROCKSDB_LITE
3251
3252 if (FLAGS_statistics) {
3253 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
3254 }
3255 if (FLAGS_simcache_size >= 0) {
3256 fprintf(stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
3257 static_cast_with_check<SimCache, Cache>(cache_.get())
3258 ->ToString()
3259 .c_str());
3260 }
3261
3262 #ifndef ROCKSDB_LITE
3263 if (FLAGS_use_secondary_db) {
3264 fprintf(stdout, "Secondary instance updated %" PRIu64 " times.\n",
3265 secondary_db_updates_);
3266 }
3267 #endif // ROCKSDB_LITE
3268 }
3269
3270 private:
3271 std::shared_ptr<TimestampEmulator> timestamp_emulator_;
3272 std::unique_ptr<port::Thread> secondary_update_thread_;
3273 std::atomic<int> secondary_update_stopped_{0};
3274 #ifndef ROCKSDB_LITE
3275 uint64_t secondary_db_updates_ = 0;
3276 #endif // ROCKSDB_LITE
3277 struct ThreadArg {
3278 Benchmark* bm;
3279 SharedState* shared;
3280 ThreadState* thread;
3281 void (Benchmark::*method)(ThreadState*);
3282 };
3283
ThreadBody(void * v)3284 static void ThreadBody(void* v) {
3285 ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
3286 SharedState* shared = arg->shared;
3287 ThreadState* thread = arg->thread;
3288 {
3289 MutexLock l(&shared->mu);
3290 shared->num_initialized++;
3291 if (shared->num_initialized >= shared->total) {
3292 shared->cv.SignalAll();
3293 }
3294 while (!shared->start) {
3295 shared->cv.Wait();
3296 }
3297 }
3298
3299 SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
3300 perf_context.EnablePerLevelPerfContext();
3301 thread->stats.Start(thread->tid);
3302 (arg->bm->*(arg->method))(thread);
3303 thread->stats.Stop();
3304
3305 {
3306 MutexLock l(&shared->mu);
3307 shared->num_done++;
3308 if (shared->num_done >= shared->total) {
3309 shared->cv.SignalAll();
3310 }
3311 }
3312 }
3313
RunBenchmark(int n,Slice name,void (Benchmark::* method)(ThreadState *))3314 Stats RunBenchmark(int n, Slice name,
3315 void (Benchmark::*method)(ThreadState*)) {
3316 SharedState shared;
3317 shared.total = n;
3318 shared.num_initialized = 0;
3319 shared.num_done = 0;
3320 shared.start = false;
3321 if (FLAGS_benchmark_write_rate_limit > 0) {
3322 shared.write_rate_limiter.reset(
3323 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
3324 }
3325 if (FLAGS_benchmark_read_rate_limit > 0) {
3326 shared.read_rate_limiter.reset(NewGenericRateLimiter(
3327 FLAGS_benchmark_read_rate_limit, 100000 /* refill_period_us */,
3328 10 /* fairness */, RateLimiter::Mode::kReadsOnly));
3329 }
3330
3331 std::unique_ptr<ReporterAgent> reporter_agent;
3332 if (FLAGS_report_interval_seconds > 0) {
3333 reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
3334 FLAGS_report_interval_seconds));
3335 }
3336
3337 ThreadArg* arg = new ThreadArg[n];
3338
3339 for (int i = 0; i < n; i++) {
3340 #ifdef NUMA
3341 if (FLAGS_enable_numa) {
3342 // Performs a local allocation of memory to threads in numa node.
3343 int n_nodes = numa_num_task_nodes(); // Number of nodes in NUMA.
3344 numa_exit_on_error = 1;
3345 int numa_node = i % n_nodes;
3346 bitmask* nodes = numa_allocate_nodemask();
3347 numa_bitmask_clearall(nodes);
3348 numa_bitmask_setbit(nodes, numa_node);
3349 // numa_bind() call binds the process to the node and these
3350 // properties are passed on to the thread that is created in
3351 // StartThread method called later in the loop.
3352 numa_bind(nodes);
3353 numa_set_strict(1);
3354 numa_free_nodemask(nodes);
3355 }
3356 #endif
3357 arg[i].bm = this;
3358 arg[i].method = method;
3359 arg[i].shared = &shared;
3360 arg[i].thread = new ThreadState(i);
3361 arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
3362 arg[i].thread->shared = &shared;
3363 FLAGS_env->StartThread(ThreadBody, &arg[i]);
3364 }
3365
3366 shared.mu.Lock();
3367 while (shared.num_initialized < n) {
3368 shared.cv.Wait();
3369 }
3370
3371 shared.start = true;
3372 shared.cv.SignalAll();
3373 while (shared.num_done < n) {
3374 shared.cv.Wait();
3375 }
3376 shared.mu.Unlock();
3377
3378 // Stats for some threads can be excluded.
3379 Stats merge_stats;
3380 for (int i = 0; i < n; i++) {
3381 merge_stats.Merge(arg[i].thread->stats);
3382 }
3383 merge_stats.Report(name);
3384
3385 for (int i = 0; i < n; i++) {
3386 delete arg[i].thread;
3387 }
3388 delete[] arg;
3389
3390 return merge_stats;
3391 }
3392
Crc32c(ThreadState * thread)3393 void Crc32c(ThreadState* thread) {
3394 // Checksum about 500MB of data total
3395 const int size = FLAGS_block_size; // use --block_size option for db_bench
3396 std::string labels = "(" + ToString(FLAGS_block_size) + " per op)";
3397 const char* label = labels.c_str();
3398
3399 std::string data(size, 'x');
3400 int64_t bytes = 0;
3401 uint32_t crc = 0;
3402 while (bytes < 500 * 1048576) {
3403 crc = crc32c::Value(data.data(), size);
3404 thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
3405 bytes += size;
3406 }
3407 // Print so result is not dead
3408 fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
3409
3410 thread->stats.AddBytes(bytes);
3411 thread->stats.AddMessage(label);
3412 }
3413
xxHash(ThreadState * thread)3414 void xxHash(ThreadState* thread) {
3415 // Checksum about 500MB of data total
3416 const int size = 4096;
3417 const char* label = "(4K per op)";
3418 std::string data(size, 'x');
3419 int64_t bytes = 0;
3420 unsigned int xxh32 = 0;
3421 while (bytes < 500 * 1048576) {
3422 xxh32 = XXH32(data.data(), size, 0);
3423 thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
3424 bytes += size;
3425 }
3426 // Print so result is not dead
3427 fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));
3428
3429 thread->stats.AddBytes(bytes);
3430 thread->stats.AddMessage(label);
3431 }
3432
AcquireLoad(ThreadState * thread)3433 void AcquireLoad(ThreadState* thread) {
3434 int dummy;
3435 std::atomic<void*> ap(&dummy);
3436 int count = 0;
3437 void *ptr = nullptr;
3438 thread->stats.AddMessage("(each op is 1000 loads)");
3439 while (count < 100000) {
3440 for (int i = 0; i < 1000; i++) {
3441 ptr = ap.load(std::memory_order_acquire);
3442 }
3443 count++;
3444 thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
3445 }
3446 if (ptr == nullptr) exit(1); // Disable unused variable warning.
3447 }
3448
Compress(ThreadState * thread)3449 void Compress(ThreadState *thread) {
3450 RandomGenerator gen;
3451 Slice input = gen.Generate(FLAGS_block_size);
3452 int64_t bytes = 0;
3453 int64_t produced = 0;
3454 bool ok = true;
3455 std::string compressed;
3456 CompressionOptions opts;
3457 CompressionContext context(FLAGS_compression_type_e);
3458 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
3459 FLAGS_compression_type_e,
3460 FLAGS_sample_for_compression);
3461 // Compress 1G
3462 while (ok && bytes < int64_t(1) << 30) {
3463 compressed.clear();
3464 ok = CompressSlice(info, input, &compressed);
3465 produced += compressed.size();
3466 bytes += input.size();
3467 thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
3468 }
3469
3470 if (!ok) {
3471 thread->stats.AddMessage("(compression failure)");
3472 } else {
3473 char buf[340];
3474 snprintf(buf, sizeof(buf), "(output: %.1f%%)",
3475 (produced * 100.0) / bytes);
3476 thread->stats.AddMessage(buf);
3477 thread->stats.AddBytes(bytes);
3478 }
3479 }
3480
Uncompress(ThreadState * thread)3481 void Uncompress(ThreadState *thread) {
3482 RandomGenerator gen;
3483 Slice input = gen.Generate(FLAGS_block_size);
3484 std::string compressed;
3485
3486 CompressionContext compression_ctx(FLAGS_compression_type_e);
3487 CompressionOptions compression_opts;
3488 CompressionInfo compression_info(
3489 compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
3490 FLAGS_compression_type_e, FLAGS_sample_for_compression);
3491 UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
3492 UncompressionInfo uncompression_info(uncompression_ctx,
3493 UncompressionDict::GetEmptyDict(),
3494 FLAGS_compression_type_e);
3495
3496 bool ok = CompressSlice(compression_info, input, &compressed);
3497 int64_t bytes = 0;
3498 int decompress_size;
3499 while (ok && bytes < 1024 * 1048576) {
3500 CacheAllocationPtr uncompressed;
3501 switch (FLAGS_compression_type_e) {
3502 case ROCKSDB_NAMESPACE::kSnappyCompression: {
3503 // get size and allocate here to make comparison fair
3504 size_t ulength = 0;
3505 if (!Snappy_GetUncompressedLength(compressed.data(),
3506 compressed.size(), &ulength)) {
3507 ok = false;
3508 break;
3509 }
3510 uncompressed = AllocateBlock(ulength, nullptr);
3511 ok = Snappy_Uncompress(compressed.data(), compressed.size(),
3512 uncompressed.get());
3513 break;
3514 }
3515 case ROCKSDB_NAMESPACE::kZlibCompression:
3516 uncompressed =
3517 Zlib_Uncompress(uncompression_info, compressed.data(),
3518 compressed.size(), &decompress_size, 2);
3519 ok = uncompressed.get() != nullptr;
3520 break;
3521 case ROCKSDB_NAMESPACE::kBZip2Compression:
3522 uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
3523 &decompress_size, 2);
3524 ok = uncompressed.get() != nullptr;
3525 break;
3526 case ROCKSDB_NAMESPACE::kLZ4Compression:
3527 uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
3528 compressed.size(), &decompress_size, 2);
3529 ok = uncompressed.get() != nullptr;
3530 break;
3531 case ROCKSDB_NAMESPACE::kLZ4HCCompression:
3532 uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
3533 compressed.size(), &decompress_size, 2);
3534 ok = uncompressed.get() != nullptr;
3535 break;
3536 case ROCKSDB_NAMESPACE::kXpressCompression:
3537 uncompressed.reset(XPRESS_Uncompress(
3538 compressed.data(), compressed.size(), &decompress_size));
3539 ok = uncompressed.get() != nullptr;
3540 break;
3541 case ROCKSDB_NAMESPACE::kZSTD:
3542 uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
3543 compressed.size(), &decompress_size);
3544 ok = uncompressed.get() != nullptr;
3545 break;
3546 default:
3547 ok = false;
3548 }
3549 bytes += input.size();
3550 thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
3551 }
3552
3553 if (!ok) {
3554 thread->stats.AddMessage("(compression failure)");
3555 } else {
3556 thread->stats.AddBytes(bytes);
3557 }
3558 }
3559
3560 // Returns true if the options is initialized from the specified
3561 // options file.
InitializeOptionsFromFile(Options * opts)3562 bool InitializeOptionsFromFile(Options* opts) {
3563 #ifndef ROCKSDB_LITE
3564 printf("Initializing RocksDB Options from the specified file\n");
3565 DBOptions db_opts;
3566 std::vector<ColumnFamilyDescriptor> cf_descs;
3567 if (FLAGS_options_file != "") {
3568 auto s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_opts,
3569 &cf_descs);
3570 db_opts.env = FLAGS_env;
3571 if (s.ok()) {
3572 *opts = Options(db_opts, cf_descs[0].options);
3573 return true;
3574 }
3575 fprintf(stderr, "Unable to load options file %s --- %s\n",
3576 FLAGS_options_file.c_str(), s.ToString().c_str());
3577 exit(1);
3578 }
3579 #else
3580 (void)opts;
3581 #endif
3582 return false;
3583 }
3584
InitializeOptionsFromFlags(Options * opts)3585 void InitializeOptionsFromFlags(Options* opts) {
3586 printf("Initializing RocksDB Options from command-line flags\n");
3587 Options& options = *opts;
3588
3589 assert(db_.db == nullptr);
3590
3591 options.env = FLAGS_env;
3592 options.max_open_files = FLAGS_open_files;
3593 if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
3594 options.write_buffer_manager.reset(
3595 new WriteBufferManager(FLAGS_db_write_buffer_size, cache_));
3596 }
3597 options.write_buffer_size = FLAGS_write_buffer_size;
3598 options.max_write_buffer_number = FLAGS_max_write_buffer_number;
3599 options.min_write_buffer_number_to_merge =
3600 FLAGS_min_write_buffer_number_to_merge;
3601 options.max_write_buffer_number_to_maintain =
3602 FLAGS_max_write_buffer_number_to_maintain;
3603 options.max_write_buffer_size_to_maintain =
3604 FLAGS_max_write_buffer_size_to_maintain;
3605 options.max_background_jobs = FLAGS_max_background_jobs;
3606 options.max_background_compactions = FLAGS_max_background_compactions;
3607 options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
3608 options.max_background_flushes = FLAGS_max_background_flushes;
3609 options.compaction_style = FLAGS_compaction_style_e;
3610 options.compaction_pri = FLAGS_compaction_pri_e;
3611 options.allow_mmap_reads = FLAGS_mmap_read;
3612 options.allow_mmap_writes = FLAGS_mmap_write;
3613 options.use_direct_reads = FLAGS_use_direct_reads;
3614 options.use_direct_io_for_flush_and_compaction =
3615 FLAGS_use_direct_io_for_flush_and_compaction;
3616 #ifndef ROCKSDB_LITE
3617 options.ttl = FLAGS_fifo_compaction_ttl;
3618 options.compaction_options_fifo = CompactionOptionsFIFO(
3619 FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
3620 FLAGS_fifo_compaction_allow_compaction);
3621 #endif // ROCKSDB_LITE
3622 if (FLAGS_prefix_size != 0) {
3623 options.prefix_extractor.reset(
3624 NewFixedPrefixTransform(FLAGS_prefix_size));
3625 }
3626 if (FLAGS_use_uint64_comparator) {
3627 options.comparator = test::Uint64Comparator();
3628 if (FLAGS_key_size != 8) {
3629 fprintf(stderr, "Using Uint64 comparator but key size is not 8.\n");
3630 exit(1);
3631 }
3632 }
3633 if (FLAGS_use_stderr_info_logger) {
3634 options.info_log.reset(new StderrLogger());
3635 }
3636 options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
3637 options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
3638 options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
3639 if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
3640 options.memtable_insert_with_hint_prefix_extractor.reset(
3641 NewCappedPrefixTransform(
3642 FLAGS_memtable_insert_with_hint_prefix_size));
3643 }
3644 options.bloom_locality = FLAGS_bloom_locality;
3645 options.max_file_opening_threads = FLAGS_file_opening_threads;
3646 options.new_table_reader_for_compaction_inputs =
3647 FLAGS_new_table_reader_for_compaction_inputs;
3648 options.compaction_readahead_size = FLAGS_compaction_readahead_size;
3649 options.log_readahead_size = FLAGS_log_readahead_size;
3650 options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
3651 options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
3652 options.use_fsync = FLAGS_use_fsync;
3653 options.num_levels = FLAGS_num_levels;
3654 options.target_file_size_base = FLAGS_target_file_size_base;
3655 options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
3656 options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
3657 options.level_compaction_dynamic_level_bytes =
3658 FLAGS_level_compaction_dynamic_level_bytes;
3659 options.max_bytes_for_level_multiplier =
3660 FLAGS_max_bytes_for_level_multiplier;
3661 if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
3662 FLAGS_rep_factory == kHashLinkedList)) {
3663 fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
3664 "HashLinkedList memtablerep is used\n");
3665 exit(1);
3666 }
3667 switch (FLAGS_rep_factory) {
3668 case kSkipList:
3669 options.memtable_factory.reset(new SkipListFactory(
3670 FLAGS_skip_list_lookahead));
3671 break;
3672 #ifndef ROCKSDB_LITE
3673 case kPrefixHash:
3674 options.memtable_factory.reset(
3675 NewHashSkipListRepFactory(FLAGS_hash_bucket_count));
3676 break;
3677 case kHashLinkedList:
3678 options.memtable_factory.reset(NewHashLinkListRepFactory(
3679 FLAGS_hash_bucket_count));
3680 break;
3681 case kVectorRep:
3682 options.memtable_factory.reset(
3683 new VectorRepFactory
3684 );
3685 break;
3686 #else
3687 default:
3688 fprintf(stderr, "Only skip list is supported in lite mode\n");
3689 exit(1);
3690 #endif // ROCKSDB_LITE
3691 }
3692 if (FLAGS_use_plain_table) {
3693 #ifndef ROCKSDB_LITE
3694 if (FLAGS_rep_factory != kPrefixHash &&
3695 FLAGS_rep_factory != kHashLinkedList) {
3696 fprintf(stderr, "Waring: plain table is used with skipList\n");
3697 }
3698
3699 int bloom_bits_per_key = FLAGS_bloom_bits;
3700 if (bloom_bits_per_key < 0) {
3701 bloom_bits_per_key = 0;
3702 }
3703
3704 PlainTableOptions plain_table_options;
3705 plain_table_options.user_key_len = FLAGS_key_size;
3706 plain_table_options.bloom_bits_per_key = bloom_bits_per_key;
3707 plain_table_options.hash_table_ratio = 0.75;
3708 options.table_factory = std::shared_ptr<TableFactory>(
3709 NewPlainTableFactory(plain_table_options));
3710 #else
3711 fprintf(stderr, "Plain table is not supported in lite mode\n");
3712 exit(1);
3713 #endif // ROCKSDB_LITE
3714 } else if (FLAGS_use_cuckoo_table) {
3715 #ifndef ROCKSDB_LITE
3716 if (FLAGS_cuckoo_hash_ratio > 1 || FLAGS_cuckoo_hash_ratio < 0) {
3717 fprintf(stderr, "Invalid cuckoo_hash_ratio\n");
3718 exit(1);
3719 }
3720
3721 if (!FLAGS_mmap_read) {
3722 fprintf(stderr, "cuckoo table format requires mmap read to operate\n");
3723 exit(1);
3724 }
3725
3726 ROCKSDB_NAMESPACE::CuckooTableOptions table_options;
3727 table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
3728 table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
3729 options.table_factory = std::shared_ptr<TableFactory>(
3730 NewCuckooTableFactory(table_options));
3731 #else
3732 fprintf(stderr, "Cuckoo table is not supported in lite mode\n");
3733 exit(1);
3734 #endif // ROCKSDB_LITE
3735 } else {
3736 BlockBasedTableOptions block_based_options;
3737 if (FLAGS_use_hash_search) {
3738 if (FLAGS_prefix_size == 0) {
3739 fprintf(stderr,
3740 "prefix_size not assigned when enable use_hash_search \n");
3741 exit(1);
3742 }
3743 block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
3744 } else {
3745 block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
3746 }
3747 if (FLAGS_partition_index_and_filters || FLAGS_partition_index) {
3748 if (FLAGS_use_hash_search) {
3749 fprintf(stderr,
3750 "use_hash_search is incompatible with "
3751 "partition index and is ignored");
3752 }
3753 block_based_options.index_type =
3754 BlockBasedTableOptions::kTwoLevelIndexSearch;
3755 block_based_options.metadata_block_size = FLAGS_metadata_block_size;
3756 if (FLAGS_partition_index_and_filters) {
3757 block_based_options.partition_filters = true;
3758 }
3759 }
3760 if (cache_ == nullptr) {
3761 block_based_options.no_block_cache = true;
3762 }
3763 block_based_options.cache_index_and_filter_blocks =
3764 FLAGS_cache_index_and_filter_blocks;
3765 block_based_options.pin_l0_filter_and_index_blocks_in_cache =
3766 FLAGS_pin_l0_filter_and_index_blocks_in_cache;
3767 block_based_options.pin_top_level_index_and_filter =
3768 FLAGS_pin_top_level_index_and_filter;
3769 if (FLAGS_cache_high_pri_pool_ratio > 1e-6) { // > 0.0 + eps
3770 block_based_options.cache_index_and_filter_blocks_with_high_priority =
3771 true;
3772 }
3773 block_based_options.block_cache = cache_;
3774 block_based_options.block_cache_compressed = compressed_cache_;
3775 block_based_options.block_size = FLAGS_block_size;
3776 block_based_options.block_restart_interval = FLAGS_block_restart_interval;
3777 block_based_options.index_block_restart_interval =
3778 FLAGS_index_block_restart_interval;
3779 block_based_options.filter_policy = filter_policy_;
3780 block_based_options.format_version =
3781 static_cast<uint32_t>(FLAGS_format_version);
3782 block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
3783 block_based_options.enable_index_compression =
3784 FLAGS_enable_index_compression;
3785 block_based_options.block_align = FLAGS_block_align;
3786 if (FLAGS_use_data_block_hash_index) {
3787 block_based_options.data_block_index_type =
3788 ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinaryAndHash;
3789 } else {
3790 block_based_options.data_block_index_type =
3791 ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinarySearch;
3792 }
3793 block_based_options.data_block_hash_table_util_ratio =
3794 FLAGS_data_block_hash_table_util_ratio;
3795 if (FLAGS_read_cache_path != "") {
3796 #ifndef ROCKSDB_LITE
3797 Status rc_status;
3798
3799 // Read cache need to be provided with a the Logger, we will put all
3800 // reac cache logs in the read cache path in a file named rc_LOG
3801 rc_status = FLAGS_env->CreateDirIfMissing(FLAGS_read_cache_path);
3802 std::shared_ptr<Logger> read_cache_logger;
3803 if (rc_status.ok()) {
3804 rc_status = FLAGS_env->NewLogger(FLAGS_read_cache_path + "/rc_LOG",
3805 &read_cache_logger);
3806 }
3807
3808 if (rc_status.ok()) {
3809 PersistentCacheConfig rc_cfg(FLAGS_env, FLAGS_read_cache_path,
3810 FLAGS_read_cache_size,
3811 read_cache_logger);
3812
3813 rc_cfg.enable_direct_reads = FLAGS_read_cache_direct_read;
3814 rc_cfg.enable_direct_writes = FLAGS_read_cache_direct_write;
3815 rc_cfg.writer_qdepth = 4;
3816 rc_cfg.writer_dispatch_size = 4 * 1024;
3817
3818 auto pcache = std::make_shared<BlockCacheTier>(rc_cfg);
3819 block_based_options.persistent_cache = pcache;
3820 rc_status = pcache->Open();
3821 }
3822
3823 if (!rc_status.ok()) {
3824 fprintf(stderr, "Error initializing read cache, %s\n",
3825 rc_status.ToString().c_str());
3826 exit(1);
3827 }
3828 #else
3829 fprintf(stderr, "Read cache is not supported in LITE\n");
3830 exit(1);
3831
3832 #endif
3833 }
3834 options.table_factory.reset(
3835 NewBlockBasedTableFactory(block_based_options));
3836 }
3837 if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
3838 if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
3839 static_cast<unsigned int>(FLAGS_num_levels)) {
3840 fprintf(stderr, "Insufficient number of fanouts specified %d\n",
3841 static_cast<int>(
3842 FLAGS_max_bytes_for_level_multiplier_additional_v.size()));
3843 exit(1);
3844 }
3845 options.max_bytes_for_level_multiplier_additional =
3846 FLAGS_max_bytes_for_level_multiplier_additional_v;
3847 }
3848 options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
3849 options.level0_file_num_compaction_trigger =
3850 FLAGS_level0_file_num_compaction_trigger;
3851 options.level0_slowdown_writes_trigger =
3852 FLAGS_level0_slowdown_writes_trigger;
3853 options.compression = FLAGS_compression_type_e;
3854 options.sample_for_compression = FLAGS_sample_for_compression;
3855 options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
3856 options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
3857 options.max_total_wal_size = FLAGS_max_total_wal_size;
3858
3859 if (FLAGS_min_level_to_compress >= 0) {
3860 assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
3861 options.compression_per_level.resize(FLAGS_num_levels);
3862 for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
3863 options.compression_per_level[i] = kNoCompression;
3864 }
3865 for (int i = FLAGS_min_level_to_compress;
3866 i < FLAGS_num_levels; i++) {
3867 options.compression_per_level[i] = FLAGS_compression_type_e;
3868 }
3869 }
3870 options.soft_rate_limit = FLAGS_soft_rate_limit;
3871 options.hard_rate_limit = FLAGS_hard_rate_limit;
3872 options.soft_pending_compaction_bytes_limit =
3873 FLAGS_soft_pending_compaction_bytes_limit;
3874 options.hard_pending_compaction_bytes_limit =
3875 FLAGS_hard_pending_compaction_bytes_limit;
3876 options.delayed_write_rate = FLAGS_delayed_write_rate;
3877 options.allow_concurrent_memtable_write =
3878 FLAGS_allow_concurrent_memtable_write;
3879 options.inplace_update_support = FLAGS_inplace_update_support;
3880 options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
3881 options.enable_write_thread_adaptive_yield =
3882 FLAGS_enable_write_thread_adaptive_yield;
3883 options.enable_pipelined_write = FLAGS_enable_pipelined_write;
3884 options.unordered_write = FLAGS_unordered_write;
3885 options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
3886 options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
3887 options.rate_limit_delay_max_milliseconds =
3888 FLAGS_rate_limit_delay_max_milliseconds;
3889 options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
3890 options.max_compaction_bytes = FLAGS_max_compaction_bytes;
3891 options.disable_auto_compactions = FLAGS_disable_auto_compactions;
3892 options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
3893
3894 // fill storage options
3895 options.advise_random_on_open = FLAGS_advise_random_on_open;
3896 options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
3897 options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
3898 options.bytes_per_sync = FLAGS_bytes_per_sync;
3899 options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
3900
3901 // merge operator options
3902 options.merge_operator = MergeOperators::CreateFromStringId(
3903 FLAGS_merge_operator);
3904 if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
3905 fprintf(stderr, "invalid merge operator: %s\n",
3906 FLAGS_merge_operator.c_str());
3907 exit(1);
3908 }
3909 options.max_successive_merges = FLAGS_max_successive_merges;
3910 options.report_bg_io_stats = FLAGS_report_bg_io_stats;
3911
3912 // set universal style compaction configurations, if applicable
3913 if (FLAGS_universal_size_ratio != 0) {
3914 options.compaction_options_universal.size_ratio =
3915 FLAGS_universal_size_ratio;
3916 }
3917 if (FLAGS_universal_min_merge_width != 0) {
3918 options.compaction_options_universal.min_merge_width =
3919 FLAGS_universal_min_merge_width;
3920 }
3921 if (FLAGS_universal_max_merge_width != 0) {
3922 options.compaction_options_universal.max_merge_width =
3923 FLAGS_universal_max_merge_width;
3924 }
3925 if (FLAGS_universal_max_size_amplification_percent != 0) {
3926 options.compaction_options_universal.max_size_amplification_percent =
3927 FLAGS_universal_max_size_amplification_percent;
3928 }
3929 if (FLAGS_universal_compression_size_percent != -1) {
3930 options.compaction_options_universal.compression_size_percent =
3931 FLAGS_universal_compression_size_percent;
3932 }
3933 options.compaction_options_universal.allow_trivial_move =
3934 FLAGS_universal_allow_trivial_move;
3935 if (FLAGS_thread_status_per_interval > 0) {
3936 options.enable_thread_tracking = true;
3937 }
3938
3939 #ifndef ROCKSDB_LITE
3940 if (FLAGS_readonly && FLAGS_transaction_db) {
3941 fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
3942 exit(1);
3943 }
3944 if (FLAGS_use_secondary_db &&
3945 (FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) {
3946 fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n");
3947 exit(1);
3948 }
3949 #endif // ROCKSDB_LITE
3950
3951 }
3952
InitializeOptionsGeneral(Options * opts)3953 void InitializeOptionsGeneral(Options* opts) {
3954 Options& options = *opts;
3955
3956 options.create_missing_column_families = FLAGS_num_column_families > 1;
3957 options.statistics = dbstats;
3958 options.wal_dir = FLAGS_wal_dir;
3959 options.create_if_missing = !FLAGS_use_existing_db;
3960 options.dump_malloc_stats = FLAGS_dump_malloc_stats;
3961 options.stats_dump_period_sec =
3962 static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
3963 options.stats_persist_period_sec =
3964 static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
3965 options.persist_stats_to_disk = FLAGS_persist_stats_to_disk;
3966 options.stats_history_buffer_size =
3967 static_cast<size_t>(FLAGS_stats_history_buffer_size);
3968
3969 options.compression_opts.level = FLAGS_compression_level;
3970 options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
3971 options.compression_opts.zstd_max_train_bytes =
3972 FLAGS_compression_zstd_max_train_bytes;
3973 // If this is a block based table, set some related options
3974 if (options.table_factory->Name() == BlockBasedTableFactory::kName &&
3975 options.table_factory->GetOptions() != nullptr) {
3976 BlockBasedTableOptions* table_options =
3977 reinterpret_cast<BlockBasedTableOptions*>(
3978 options.table_factory->GetOptions());
3979 if (FLAGS_cache_size) {
3980 table_options->block_cache = cache_;
3981 }
3982 if (FLAGS_bloom_bits >= 0) {
3983 table_options->filter_policy.reset(NewBloomFilterPolicy(
3984 FLAGS_bloom_bits, FLAGS_use_block_based_filter));
3985 }
3986 }
3987 if (FLAGS_row_cache_size) {
3988 if (FLAGS_cache_numshardbits >= 1) {
3989 options.row_cache =
3990 NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
3991 } else {
3992 options.row_cache = NewLRUCache(FLAGS_row_cache_size);
3993 }
3994 }
3995 if (FLAGS_enable_io_prio) {
3996 FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
3997 FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
3998 }
3999 if (FLAGS_enable_cpu_prio) {
4000 FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW);
4001 FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH);
4002 }
4003 options.env = FLAGS_env;
4004 if (FLAGS_sine_write_rate) {
4005 FLAGS_benchmark_write_rate_limit = static_cast<uint64_t>(SineRate(0));
4006 }
4007
4008 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
4009 if (FLAGS_rate_limit_bg_reads &&
4010 !FLAGS_new_table_reader_for_compaction_inputs) {
4011 fprintf(stderr,
4012 "rate limit compaction reads must have "
4013 "new_table_reader_for_compaction_inputs set\n");
4014 exit(1);
4015 }
4016 options.rate_limiter.reset(NewGenericRateLimiter(
4017 FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */,
4018 10 /* fairness */,
4019 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
4020 : RateLimiter::Mode::kWritesOnly,
4021 FLAGS_rate_limiter_auto_tuned));
4022 }
4023
4024 options.listeners.emplace_back(listener_);
4025 if (FLAGS_num_multi_db <= 1) {
4026 OpenDb(options, FLAGS_db, &db_);
4027 } else {
4028 multi_dbs_.clear();
4029 multi_dbs_.resize(FLAGS_num_multi_db);
4030 auto wal_dir = options.wal_dir;
4031 for (int i = 0; i < FLAGS_num_multi_db; i++) {
4032 if (!wal_dir.empty()) {
4033 options.wal_dir = GetPathForMultiple(wal_dir, i);
4034 }
4035 OpenDb(options, GetPathForMultiple(FLAGS_db, i), &multi_dbs_[i]);
4036 }
4037 options.wal_dir = wal_dir;
4038 }
4039
4040 // KeepFilter is a noop filter, this can be used to test compaction filter
4041 if (FLAGS_use_keep_filter) {
4042 options.compaction_filter = new KeepFilter();
4043 fprintf(stdout, "A noop compaction filter is used\n");
4044 }
4045
4046 if (FLAGS_use_existing_keys) {
4047 // Only work on single database
4048 assert(db_.db != nullptr);
4049 ReadOptions read_opts;
4050 read_opts.total_order_seek = true;
4051 Iterator* iter = db_.db->NewIterator(read_opts);
4052 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
4053 keys_.emplace_back(iter->key().ToString());
4054 }
4055 delete iter;
4056 FLAGS_num = keys_.size();
4057 }
4058 }
4059
Open(Options * opts)4060 void Open(Options* opts) {
4061 if (!InitializeOptionsFromFile(opts)) {
4062 InitializeOptionsFromFlags(opts);
4063 }
4064
4065 InitializeOptionsGeneral(opts);
4066 }
4067
OpenDb(Options options,const std::string & db_name,DBWithColumnFamilies * db)4068 void OpenDb(Options options, const std::string& db_name,
4069 DBWithColumnFamilies* db) {
4070 Status s;
4071 // Open with column families if necessary.
4072 if (FLAGS_num_column_families > 1) {
4073 size_t num_hot = FLAGS_num_column_families;
4074 if (FLAGS_num_hot_column_families > 0 &&
4075 FLAGS_num_hot_column_families < FLAGS_num_column_families) {
4076 num_hot = FLAGS_num_hot_column_families;
4077 } else {
4078 FLAGS_num_hot_column_families = FLAGS_num_column_families;
4079 }
4080 std::vector<ColumnFamilyDescriptor> column_families;
4081 for (size_t i = 0; i < num_hot; i++) {
4082 column_families.push_back(ColumnFamilyDescriptor(
4083 ColumnFamilyName(i), ColumnFamilyOptions(options)));
4084 }
4085 std::vector<int> cfh_idx_to_prob;
4086 if (!FLAGS_column_family_distribution.empty()) {
4087 std::stringstream cf_prob_stream(FLAGS_column_family_distribution);
4088 std::string cf_prob;
4089 int sum = 0;
4090 while (std::getline(cf_prob_stream, cf_prob, ',')) {
4091 cfh_idx_to_prob.push_back(std::stoi(cf_prob));
4092 sum += cfh_idx_to_prob.back();
4093 }
4094 if (sum != 100) {
4095 fprintf(stderr, "column_family_distribution items must sum to 100\n");
4096 exit(1);
4097 }
4098 if (cfh_idx_to_prob.size() != num_hot) {
4099 fprintf(stderr,
4100 "got %" ROCKSDB_PRIszt
4101 " column_family_distribution items; expected "
4102 "%" ROCKSDB_PRIszt "\n",
4103 cfh_idx_to_prob.size(), num_hot);
4104 exit(1);
4105 }
4106 }
4107 #ifndef ROCKSDB_LITE
4108 if (FLAGS_readonly) {
4109 s = DB::OpenForReadOnly(options, db_name, column_families,
4110 &db->cfh, &db->db);
4111 } else if (FLAGS_optimistic_transaction_db) {
4112 s = OptimisticTransactionDB::Open(options, db_name, column_families,
4113 &db->cfh, &db->opt_txn_db);
4114 if (s.ok()) {
4115 db->db = db->opt_txn_db->GetBaseDB();
4116 }
4117 } else if (FLAGS_transaction_db) {
4118 TransactionDB* ptr;
4119 TransactionDBOptions txn_db_options;
4120 if (options.unordered_write) {
4121 options.two_write_queues = true;
4122 txn_db_options.skip_concurrency_control = true;
4123 txn_db_options.write_policy = WRITE_PREPARED;
4124 }
4125 s = TransactionDB::Open(options, txn_db_options, db_name,
4126 column_families, &db->cfh, &ptr);
4127 if (s.ok()) {
4128 db->db = ptr;
4129 }
4130 } else {
4131 s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
4132 }
4133 #else
4134 s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
4135 #endif // ROCKSDB_LITE
4136 db->cfh.resize(FLAGS_num_column_families);
4137 db->num_created = num_hot;
4138 db->num_hot = num_hot;
4139 db->cfh_idx_to_prob = std::move(cfh_idx_to_prob);
4140 #ifndef ROCKSDB_LITE
4141 } else if (FLAGS_readonly) {
4142 s = DB::OpenForReadOnly(options, db_name, &db->db);
4143 } else if (FLAGS_optimistic_transaction_db) {
4144 s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db);
4145 if (s.ok()) {
4146 db->db = db->opt_txn_db->GetBaseDB();
4147 }
4148 } else if (FLAGS_transaction_db) {
4149 TransactionDB* ptr = nullptr;
4150 TransactionDBOptions txn_db_options;
4151 if (options.unordered_write) {
4152 options.two_write_queues = true;
4153 txn_db_options.skip_concurrency_control = true;
4154 txn_db_options.write_policy = WRITE_PREPARED;
4155 }
4156 s = CreateLoggerFromOptions(db_name, options, &options.info_log);
4157 if (s.ok()) {
4158 s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
4159 }
4160 if (s.ok()) {
4161 db->db = ptr;
4162 }
4163 } else if (FLAGS_use_blob_db) {
4164 blob_db::BlobDBOptions blob_db_options;
4165 blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
4166 blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
4167 blob_db_options.is_fifo = FLAGS_blob_db_is_fifo;
4168 blob_db_options.max_db_size = FLAGS_blob_db_max_db_size;
4169 blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs;
4170 blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
4171 blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
4172 blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
4173 blob_db_options.compression = FLAGS_blob_db_compression_type_e;
4174 blob_db::BlobDB* ptr = nullptr;
4175 s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
4176 if (s.ok()) {
4177 db->db = ptr;
4178 }
4179 } else if (FLAGS_use_secondary_db) {
4180 if (FLAGS_secondary_path.empty()) {
4181 std::string default_secondary_path;
4182 FLAGS_env->GetTestDirectory(&default_secondary_path);
4183 default_secondary_path += "/dbbench_secondary";
4184 FLAGS_secondary_path = default_secondary_path;
4185 }
4186 s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db);
4187 if (s.ok() && FLAGS_secondary_update_interval > 0) {
4188 secondary_update_thread_.reset(new port::Thread(
4189 [this](int interval, DBWithColumnFamilies* _db) {
4190 while (0 == secondary_update_stopped_.load(
4191 std::memory_order_relaxed)) {
4192 Status secondary_update_status =
4193 _db->db->TryCatchUpWithPrimary();
4194 if (!secondary_update_status.ok()) {
4195 fprintf(stderr, "Failed to catch up with primary: %s\n",
4196 secondary_update_status.ToString().c_str());
4197 break;
4198 }
4199 ++secondary_db_updates_;
4200 FLAGS_env->SleepForMicroseconds(interval * 1000000);
4201 }
4202 },
4203 FLAGS_secondary_update_interval, db));
4204 }
4205 #endif // ROCKSDB_LITE
4206 } else {
4207 s = DB::Open(options, db_name, &db->db);
4208 }
4209 if (!s.ok()) {
4210 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
4211 exit(1);
4212 }
4213 }
4214
4215 enum WriteMode {
4216 RANDOM, SEQUENTIAL, UNIQUE_RANDOM
4217 };
4218
WriteSeqDeterministic(ThreadState * thread)4219 void WriteSeqDeterministic(ThreadState* thread) {
4220 DoDeterministicCompact(thread, open_options_.compaction_style, SEQUENTIAL);
4221 }
4222
WriteUniqueRandomDeterministic(ThreadState * thread)4223 void WriteUniqueRandomDeterministic(ThreadState* thread) {
4224 DoDeterministicCompact(thread, open_options_.compaction_style,
4225 UNIQUE_RANDOM);
4226 }
4227
WriteSeq(ThreadState * thread)4228 void WriteSeq(ThreadState* thread) {
4229 DoWrite(thread, SEQUENTIAL);
4230 }
4231
WriteRandom(ThreadState * thread)4232 void WriteRandom(ThreadState* thread) {
4233 DoWrite(thread, RANDOM);
4234 }
4235
WriteUniqueRandom(ThreadState * thread)4236 void WriteUniqueRandom(ThreadState* thread) {
4237 DoWrite(thread, UNIQUE_RANDOM);
4238 }
4239
4240 class KeyGenerator {
4241 public:
KeyGenerator(Random64 * rand,WriteMode mode,uint64_t num,uint64_t=64* 1024)4242 KeyGenerator(Random64* rand, WriteMode mode, uint64_t num,
4243 uint64_t /*num_per_set*/ = 64 * 1024)
4244 : rand_(rand), mode_(mode), num_(num), next_(0) {
4245 if (mode_ == UNIQUE_RANDOM) {
4246 // NOTE: if memory consumption of this approach becomes a concern,
4247 // we can either break it into pieces and only random shuffle a section
4248 // each time. Alternatively, use a bit map implementation
4249 // (https://reviews.facebook.net/differential/diff/54627/)
4250 values_.resize(num_);
4251 for (uint64_t i = 0; i < num_; ++i) {
4252 values_[i] = i;
4253 }
4254 std::shuffle(
4255 values_.begin(), values_.end(),
4256 std::default_random_engine(static_cast<unsigned int>(FLAGS_seed)));
4257 }
4258 }
4259
Next()4260 uint64_t Next() {
4261 switch (mode_) {
4262 case SEQUENTIAL:
4263 return next_++;
4264 case RANDOM:
4265 return rand_->Next() % num_;
4266 case UNIQUE_RANDOM:
4267 assert(next_ < num_);
4268 return values_[next_++];
4269 }
4270 assert(false);
4271 return std::numeric_limits<uint64_t>::max();
4272 }
4273
4274 private:
4275 Random64* rand_;
4276 WriteMode mode_;
4277 const uint64_t num_;
4278 uint64_t next_;
4279 std::vector<uint64_t> values_;
4280 };
4281
SelectDB(ThreadState * thread)4282 DB* SelectDB(ThreadState* thread) {
4283 return SelectDBWithCfh(thread)->db;
4284 }
4285
SelectDBWithCfh(ThreadState * thread)4286 DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
4287 return SelectDBWithCfh(thread->rand.Next());
4288 }
4289
SelectDBWithCfh(uint64_t rand_int)4290 DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
4291 if (db_.db != nullptr) {
4292 return &db_;
4293 } else {
4294 return &multi_dbs_[rand_int % multi_dbs_.size()];
4295 }
4296 }
4297
SineRate(double x)4298 double SineRate(double x) {
4299 return FLAGS_sine_a*sin((FLAGS_sine_b*x) + FLAGS_sine_c) + FLAGS_sine_d;
4300 }
4301
DoWrite(ThreadState * thread,WriteMode write_mode)4302 void DoWrite(ThreadState* thread, WriteMode write_mode) {
4303 const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
4304 const int64_t num_ops = writes_ == 0 ? num_ : writes_;
4305
4306 size_t num_key_gens = 1;
4307 if (db_.db == nullptr) {
4308 num_key_gens = multi_dbs_.size();
4309 }
4310 std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
4311 int64_t max_ops = num_ops * num_key_gens;
4312 int64_t ops_per_stage = max_ops;
4313 if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
4314 ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
4315 FLAGS_num_hot_column_families) +
4316 1;
4317 }
4318
4319 Duration duration(test_duration, max_ops, ops_per_stage);
4320 for (size_t i = 0; i < num_key_gens; i++) {
4321 key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode,
4322 num_ + max_num_range_tombstones_,
4323 ops_per_stage));
4324 }
4325
4326 if (num_ != FLAGS_num) {
4327 char msg[100];
4328 snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
4329 thread->stats.AddMessage(msg);
4330 }
4331
4332 RandomGenerator gen;
4333 WriteBatch batch;
4334 Status s;
4335 int64_t bytes = 0;
4336
4337 std::unique_ptr<const char[]> key_guard;
4338 Slice key = AllocateKey(&key_guard);
4339 std::unique_ptr<const char[]> begin_key_guard;
4340 Slice begin_key = AllocateKey(&begin_key_guard);
4341 std::unique_ptr<const char[]> end_key_guard;
4342 Slice end_key = AllocateKey(&end_key_guard);
4343 std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
4344 std::vector<Slice> expanded_keys;
4345 if (FLAGS_expand_range_tombstones) {
4346 expanded_key_guards.resize(range_tombstone_width_);
4347 for (auto& expanded_key_guard : expanded_key_guards) {
4348 expanded_keys.emplace_back(AllocateKey(&expanded_key_guard));
4349 }
4350 }
4351
4352 int64_t stage = 0;
4353 int64_t num_written = 0;
4354 while (!duration.Done(entries_per_batch_)) {
4355 if (duration.GetStage() != stage) {
4356 stage = duration.GetStage();
4357 if (db_.db != nullptr) {
4358 db_.CreateNewCf(open_options_, stage);
4359 } else {
4360 for (auto& db : multi_dbs_) {
4361 db.CreateNewCf(open_options_, stage);
4362 }
4363 }
4364 }
4365
4366 size_t id = thread->rand.Next() % num_key_gens;
4367 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
4368 batch.Clear();
4369 int64_t batch_bytes = 0;
4370
4371 for (int64_t j = 0; j < entries_per_batch_; j++) {
4372 int64_t rand_num = key_gens[id]->Next();
4373 GenerateKeyFromInt(rand_num, FLAGS_num, &key);
4374 Slice val = gen.Generate();
4375 if (use_blob_db_) {
4376 #ifndef ROCKSDB_LITE
4377 blob_db::BlobDB* blobdb =
4378 static_cast<blob_db::BlobDB*>(db_with_cfh->db);
4379 if (FLAGS_blob_db_max_ttl_range > 0) {
4380 int ttl = rand() % FLAGS_blob_db_max_ttl_range;
4381 s = blobdb->PutWithTTL(write_options_, key, val, ttl);
4382 } else {
4383 s = blobdb->Put(write_options_, key, val);
4384 }
4385 #endif // ROCKSDB_LITE
4386 } else if (FLAGS_num_column_families <= 1) {
4387 batch.Put(key, val);
4388 } else {
4389 // We use same rand_num as seed for key and column family so that we
4390 // can deterministically find the cfh corresponding to a particular
4391 // key while reading the key.
4392 batch.Put(db_with_cfh->GetCfh(rand_num), key,
4393 val);
4394 }
4395 batch_bytes += val.size() + key_size_;
4396 bytes += val.size() + key_size_;
4397 ++num_written;
4398 if (writes_per_range_tombstone_ > 0 &&
4399 num_written > writes_before_delete_range_ &&
4400 (num_written - writes_before_delete_range_) /
4401 writes_per_range_tombstone_ <=
4402 max_num_range_tombstones_ &&
4403 (num_written - writes_before_delete_range_) %
4404 writes_per_range_tombstone_ ==
4405 0) {
4406 int64_t begin_num = key_gens[id]->Next();
4407 if (FLAGS_expand_range_tombstones) {
4408 for (int64_t offset = 0; offset < range_tombstone_width_;
4409 ++offset) {
4410 GenerateKeyFromInt(begin_num + offset, FLAGS_num,
4411 &expanded_keys[offset]);
4412 if (use_blob_db_) {
4413 #ifndef ROCKSDB_LITE
4414 s = db_with_cfh->db->Delete(write_options_,
4415 expanded_keys[offset]);
4416 #endif // ROCKSDB_LITE
4417 } else if (FLAGS_num_column_families <= 1) {
4418 batch.Delete(expanded_keys[offset]);
4419 } else {
4420 batch.Delete(db_with_cfh->GetCfh(rand_num),
4421 expanded_keys[offset]);
4422 }
4423 }
4424 } else {
4425 GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
4426 GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
4427 &end_key);
4428 if (use_blob_db_) {
4429 #ifndef ROCKSDB_LITE
4430 s = db_with_cfh->db->DeleteRange(
4431 write_options_, db_with_cfh->db->DefaultColumnFamily(),
4432 begin_key, end_key);
4433 #endif // ROCKSDB_LITE
4434 } else if (FLAGS_num_column_families <= 1) {
4435 batch.DeleteRange(begin_key, end_key);
4436 } else {
4437 batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key,
4438 end_key);
4439 }
4440 }
4441 }
4442 }
4443 if (thread->shared->write_rate_limiter.get() != nullptr) {
4444 thread->shared->write_rate_limiter->Request(
4445 batch_bytes, Env::IO_HIGH,
4446 nullptr /* stats */, RateLimiter::OpType::kWrite);
4447 // Set time at which last op finished to Now() to hide latency and
4448 // sleep from rate limiter. Also, do the check once per batch, not
4449 // once per write.
4450 thread->stats.ResetLastOpTime();
4451 }
4452 if (!use_blob_db_) {
4453 s = db_with_cfh->db->Write(write_options_, &batch);
4454 }
4455 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
4456 entries_per_batch_, kWrite);
4457 if (FLAGS_sine_write_rate) {
4458 uint64_t now = FLAGS_env->NowMicros();
4459
4460 uint64_t usecs_since_last;
4461 if (now > thread->stats.GetSineInterval()) {
4462 usecs_since_last = now - thread->stats.GetSineInterval();
4463 } else {
4464 usecs_since_last = 0;
4465 }
4466
4467 if (usecs_since_last >
4468 (FLAGS_sine_write_rate_interval_milliseconds * uint64_t{1000})) {
4469 double usecs_since_start =
4470 static_cast<double>(now - thread->stats.GetStart());
4471 thread->stats.ResetSineInterval();
4472 uint64_t write_rate =
4473 static_cast<uint64_t>(SineRate(usecs_since_start / 1000000.0));
4474 thread->shared->write_rate_limiter.reset(
4475 NewGenericRateLimiter(write_rate));
4476 }
4477 }
4478 if (!s.ok()) {
4479 s = listener_->WaitForRecovery(600000000) ? Status::OK() : s;
4480 }
4481
4482 if (!s.ok()) {
4483 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
4484 exit(1);
4485 }
4486 }
4487 thread->stats.AddBytes(bytes);
4488 }
4489
DoDeterministicCompact(ThreadState * thread,CompactionStyle compaction_style,WriteMode write_mode)4490 Status DoDeterministicCompact(ThreadState* thread,
4491 CompactionStyle compaction_style,
4492 WriteMode write_mode) {
4493 #ifndef ROCKSDB_LITE
4494 ColumnFamilyMetaData meta;
4495 std::vector<DB*> db_list;
4496 if (db_.db != nullptr) {
4497 db_list.push_back(db_.db);
4498 } else {
4499 for (auto& db : multi_dbs_) {
4500 db_list.push_back(db.db);
4501 }
4502 }
4503 std::vector<Options> options_list;
4504 for (auto db : db_list) {
4505 options_list.push_back(db->GetOptions());
4506 if (compaction_style != kCompactionStyleFIFO) {
4507 db->SetOptions({{"disable_auto_compactions", "1"},
4508 {"level0_slowdown_writes_trigger", "400000000"},
4509 {"level0_stop_writes_trigger", "400000000"}});
4510 } else {
4511 db->SetOptions({{"disable_auto_compactions", "1"}});
4512 }
4513 }
4514
4515 assert(!db_list.empty());
4516 auto num_db = db_list.size();
4517 size_t num_levels = static_cast<size_t>(open_options_.num_levels);
4518 size_t output_level = open_options_.num_levels - 1;
4519 std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db);
4520 std::vector<size_t> num_files_at_level0(num_db, 0);
4521 if (compaction_style == kCompactionStyleLevel) {
4522 if (num_levels == 0) {
4523 return Status::InvalidArgument("num_levels should be larger than 1");
4524 }
4525 bool should_stop = false;
4526 while (!should_stop) {
4527 if (sorted_runs[0].empty()) {
4528 DoWrite(thread, write_mode);
4529 } else {
4530 DoWrite(thread, UNIQUE_RANDOM);
4531 }
4532 for (size_t i = 0; i < num_db; i++) {
4533 auto db = db_list[i];
4534 db->Flush(FlushOptions());
4535 db->GetColumnFamilyMetaData(&meta);
4536 if (num_files_at_level0[i] == meta.levels[0].files.size() ||
4537 writes_ == 0) {
4538 should_stop = true;
4539 continue;
4540 }
4541 sorted_runs[i].emplace_back(
4542 meta.levels[0].files.begin(),
4543 meta.levels[0].files.end() - num_files_at_level0[i]);
4544 num_files_at_level0[i] = meta.levels[0].files.size();
4545 if (sorted_runs[i].back().size() == 1) {
4546 should_stop = true;
4547 continue;
4548 }
4549 if (sorted_runs[i].size() == output_level) {
4550 auto& L1 = sorted_runs[i].back();
4551 L1.erase(L1.begin(), L1.begin() + L1.size() / 3);
4552 should_stop = true;
4553 continue;
4554 }
4555 }
4556 writes_ /= static_cast<int64_t>(open_options_.max_bytes_for_level_multiplier);
4557 }
4558 for (size_t i = 0; i < num_db; i++) {
4559 if (sorted_runs[i].size() < num_levels - 1) {
4560 fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
4561 exit(1);
4562 }
4563 }
4564 for (size_t i = 0; i < num_db; i++) {
4565 auto db = db_list[i];
4566 auto compactionOptions = CompactionOptions();
4567 compactionOptions.compression = FLAGS_compression_type_e;
4568 auto options = db->GetOptions();
4569 MutableCFOptions mutable_cf_options(options);
4570 for (size_t j = 0; j < sorted_runs[i].size(); j++) {
4571 compactionOptions.output_file_size_limit =
4572 MaxFileSizeForLevel(mutable_cf_options,
4573 static_cast<int>(output_level), compaction_style);
4574 std::cout << sorted_runs[i][j].size() << std::endl;
4575 db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name,
4576 sorted_runs[i][j].front().name},
4577 static_cast<int>(output_level - j) /*level*/);
4578 }
4579 }
4580 } else if (compaction_style == kCompactionStyleUniversal) {
4581 auto ratio = open_options_.compaction_options_universal.size_ratio;
4582 bool should_stop = false;
4583 while (!should_stop) {
4584 if (sorted_runs[0].empty()) {
4585 DoWrite(thread, write_mode);
4586 } else {
4587 DoWrite(thread, UNIQUE_RANDOM);
4588 }
4589 for (size_t i = 0; i < num_db; i++) {
4590 auto db = db_list[i];
4591 db->Flush(FlushOptions());
4592 db->GetColumnFamilyMetaData(&meta);
4593 if (num_files_at_level0[i] == meta.levels[0].files.size() ||
4594 writes_ == 0) {
4595 should_stop = true;
4596 continue;
4597 }
4598 sorted_runs[i].emplace_back(
4599 meta.levels[0].files.begin(),
4600 meta.levels[0].files.end() - num_files_at_level0[i]);
4601 num_files_at_level0[i] = meta.levels[0].files.size();
4602 if (sorted_runs[i].back().size() == 1) {
4603 should_stop = true;
4604 continue;
4605 }
4606 num_files_at_level0[i] = meta.levels[0].files.size();
4607 }
4608 writes_ = static_cast<int64_t>(writes_* static_cast<double>(100) / (ratio + 200));
4609 }
4610 for (size_t i = 0; i < num_db; i++) {
4611 if (sorted_runs[i].size() < num_levels) {
4612 fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
4613 exit(1);
4614 }
4615 }
4616 for (size_t i = 0; i < num_db; i++) {
4617 auto db = db_list[i];
4618 auto compactionOptions = CompactionOptions();
4619 compactionOptions.compression = FLAGS_compression_type_e;
4620 auto options = db->GetOptions();
4621 MutableCFOptions mutable_cf_options(options);
4622 for (size_t j = 0; j < sorted_runs[i].size(); j++) {
4623 compactionOptions.output_file_size_limit =
4624 MaxFileSizeForLevel(mutable_cf_options,
4625 static_cast<int>(output_level), compaction_style);
4626 db->CompactFiles(
4627 compactionOptions,
4628 {sorted_runs[i][j].back().name, sorted_runs[i][j].front().name},
4629 (output_level > j ? static_cast<int>(output_level - j)
4630 : 0) /*level*/);
4631 }
4632 }
4633 } else if (compaction_style == kCompactionStyleFIFO) {
4634 if (num_levels != 1) {
4635 return Status::InvalidArgument(
4636 "num_levels should be 1 for FIFO compaction");
4637 }
4638 if (FLAGS_num_multi_db != 0) {
4639 return Status::InvalidArgument("Doesn't support multiDB");
4640 }
4641 auto db = db_list[0];
4642 std::vector<std::string> file_names;
4643 while (true) {
4644 if (sorted_runs[0].empty()) {
4645 DoWrite(thread, write_mode);
4646 } else {
4647 DoWrite(thread, UNIQUE_RANDOM);
4648 }
4649 db->Flush(FlushOptions());
4650 db->GetColumnFamilyMetaData(&meta);
4651 auto total_size = meta.levels[0].size;
4652 if (total_size >=
4653 db->GetOptions().compaction_options_fifo.max_table_files_size) {
4654 for (auto file_meta : meta.levels[0].files) {
4655 file_names.emplace_back(file_meta.name);
4656 }
4657 break;
4658 }
4659 }
4660 // TODO(shuzhang1989): Investigate why CompactFiles not working
4661 // auto compactionOptions = CompactionOptions();
4662 // db->CompactFiles(compactionOptions, file_names, 0);
4663 auto compactionOptions = CompactRangeOptions();
4664 db->CompactRange(compactionOptions, nullptr, nullptr);
4665 } else {
4666 fprintf(stdout,
4667 "%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
4668 "filldeterministic");
4669 return Status::InvalidArgument("None compaction is not supported");
4670 }
4671
4672 // Verify seqno and key range
4673 // Note: the seqno get changed at the max level by implementation
4674 // optimization, so skip the check of the max level.
4675 #ifndef NDEBUG
4676 for (size_t k = 0; k < num_db; k++) {
4677 auto db = db_list[k];
4678 db->GetColumnFamilyMetaData(&meta);
4679 // verify the number of sorted runs
4680 if (compaction_style == kCompactionStyleLevel) {
4681 assert(num_levels - 1 == sorted_runs[k].size());
4682 } else if (compaction_style == kCompactionStyleUniversal) {
4683 assert(meta.levels[0].files.size() + num_levels - 1 ==
4684 sorted_runs[k].size());
4685 } else if (compaction_style == kCompactionStyleFIFO) {
4686 // TODO(gzh): FIFO compaction
4687 db->GetColumnFamilyMetaData(&meta);
4688 auto total_size = meta.levels[0].size;
4689 assert(total_size <=
4690 db->GetOptions().compaction_options_fifo.max_table_files_size);
4691 break;
4692 }
4693
4694 // verify smallest/largest seqno and key range of each sorted run
4695 auto max_level = num_levels - 1;
4696 int level;
4697 for (size_t i = 0; i < sorted_runs[k].size(); i++) {
4698 level = static_cast<int>(max_level - i);
4699 SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber;
4700 SequenceNumber sorted_run_largest_seqno = 0;
4701 std::string sorted_run_smallest_key, sorted_run_largest_key;
4702 bool first_key = true;
4703 for (auto fileMeta : sorted_runs[k][i]) {
4704 sorted_run_smallest_seqno =
4705 std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno);
4706 sorted_run_largest_seqno =
4707 std::max(sorted_run_largest_seqno, fileMeta.largest_seqno);
4708 if (first_key ||
4709 db->DefaultColumnFamily()->GetComparator()->Compare(
4710 fileMeta.smallestkey, sorted_run_smallest_key) < 0) {
4711 sorted_run_smallest_key = fileMeta.smallestkey;
4712 }
4713 if (first_key ||
4714 db->DefaultColumnFamily()->GetComparator()->Compare(
4715 fileMeta.largestkey, sorted_run_largest_key) > 0) {
4716 sorted_run_largest_key = fileMeta.largestkey;
4717 }
4718 first_key = false;
4719 }
4720 if (compaction_style == kCompactionStyleLevel ||
4721 (compaction_style == kCompactionStyleUniversal && level > 0)) {
4722 SequenceNumber level_smallest_seqno = kMaxSequenceNumber;
4723 SequenceNumber level_largest_seqno = 0;
4724 for (auto fileMeta : meta.levels[level].files) {
4725 level_smallest_seqno =
4726 std::min(level_smallest_seqno, fileMeta.smallest_seqno);
4727 level_largest_seqno =
4728 std::max(level_largest_seqno, fileMeta.largest_seqno);
4729 }
4730 assert(sorted_run_smallest_key ==
4731 meta.levels[level].files.front().smallestkey);
4732 assert(sorted_run_largest_key ==
4733 meta.levels[level].files.back().largestkey);
4734 if (level != static_cast<int>(max_level)) {
4735 // compaction at max_level would change sequence number
4736 assert(sorted_run_smallest_seqno == level_smallest_seqno);
4737 assert(sorted_run_largest_seqno == level_largest_seqno);
4738 }
4739 } else if (compaction_style == kCompactionStyleUniversal) {
4740 // level <= 0 means sorted runs on level 0
4741 auto level0_file =
4742 meta.levels[0].files[sorted_runs[k].size() - 1 - i];
4743 assert(sorted_run_smallest_key == level0_file.smallestkey);
4744 assert(sorted_run_largest_key == level0_file.largestkey);
4745 if (level != static_cast<int>(max_level)) {
4746 assert(sorted_run_smallest_seqno == level0_file.smallest_seqno);
4747 assert(sorted_run_largest_seqno == level0_file.largest_seqno);
4748 }
4749 }
4750 }
4751 }
4752 #endif
4753 // print the size of each sorted_run
4754 for (size_t k = 0; k < num_db; k++) {
4755 auto db = db_list[k];
4756 fprintf(stdout,
4757 "---------------------- DB %" ROCKSDB_PRIszt " LSM ---------------------\n", k);
4758 db->GetColumnFamilyMetaData(&meta);
4759 for (auto& levelMeta : meta.levels) {
4760 if (levelMeta.files.empty()) {
4761 continue;
4762 }
4763 if (levelMeta.level == 0) {
4764 for (auto& fileMeta : levelMeta.files) {
4765 fprintf(stdout, "Level[%d]: %s(size: %" ROCKSDB_PRIszt " bytes)\n",
4766 levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
4767 }
4768 } else {
4769 fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
4770 levelMeta.level, levelMeta.files.front().name.c_str(),
4771 levelMeta.files.back().name.c_str(), levelMeta.size);
4772 }
4773 }
4774 }
4775 for (size_t i = 0; i < num_db; i++) {
4776 db_list[i]->SetOptions(
4777 {{"disable_auto_compactions",
4778 std::to_string(options_list[i].disable_auto_compactions)},
4779 {"level0_slowdown_writes_trigger",
4780 std::to_string(options_list[i].level0_slowdown_writes_trigger)},
4781 {"level0_stop_writes_trigger",
4782 std::to_string(options_list[i].level0_stop_writes_trigger)}});
4783 }
4784 return Status::OK();
4785 #else
4786 (void)thread;
4787 (void)compaction_style;
4788 (void)write_mode;
4789 fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n");
4790 return Status::NotSupported(
4791 "Rocksdb Lite doesn't support filldeterministic");
4792 #endif // ROCKSDB_LITE
4793 }
4794
ReadSequential(ThreadState * thread)4795 void ReadSequential(ThreadState* thread) {
4796 if (db_.db != nullptr) {
4797 ReadSequential(thread, db_.db);
4798 } else {
4799 for (const auto& db_with_cfh : multi_dbs_) {
4800 ReadSequential(thread, db_with_cfh.db);
4801 }
4802 }
4803 }
4804
ReadSequential(ThreadState * thread,DB * db)4805 void ReadSequential(ThreadState* thread, DB* db) {
4806 ReadOptions options(FLAGS_verify_checksum, true);
4807 options.tailing = FLAGS_use_tailing_iterator;
4808
4809 Iterator* iter = db->NewIterator(options);
4810 int64_t i = 0;
4811 int64_t bytes = 0;
4812 for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
4813 bytes += iter->key().size() + iter->value().size();
4814 thread->stats.FinishedOps(nullptr, db, 1, kRead);
4815 ++i;
4816
4817 if (thread->shared->read_rate_limiter.get() != nullptr &&
4818 i % 1024 == 1023) {
4819 thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
4820 nullptr /* stats */,
4821 RateLimiter::OpType::kRead);
4822 }
4823 }
4824
4825 delete iter;
4826 thread->stats.AddBytes(bytes);
4827 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
4828 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4829 get_perf_context()->ToString());
4830 }
4831 }
4832
ReadToRowCache(ThreadState * thread)4833 void ReadToRowCache(ThreadState* thread) {
4834 int64_t read = 0;
4835 int64_t found = 0;
4836 int64_t bytes = 0;
4837 int64_t key_rand = 0;
4838 ReadOptions options(FLAGS_verify_checksum, true);
4839 std::unique_ptr<const char[]> key_guard;
4840 Slice key = AllocateKey(&key_guard);
4841 PinnableSlice pinnable_val;
4842
4843 while (key_rand < FLAGS_num) {
4844 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
4845 // We use same key_rand as seed for key and column family so that we can
4846 // deterministically find the cfh corresponding to a particular key, as it
4847 // is done in DoWrite method.
4848 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
4849 key_rand++;
4850 read++;
4851 Status s;
4852 if (FLAGS_num_column_families > 1) {
4853 s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
4854 &pinnable_val);
4855 } else {
4856 pinnable_val.Reset();
4857 s = db_with_cfh->db->Get(options,
4858 db_with_cfh->db->DefaultColumnFamily(), key,
4859 &pinnable_val);
4860 }
4861
4862 if (s.ok()) {
4863 found++;
4864 bytes += key.size() + pinnable_val.size();
4865 } else if (!s.IsNotFound()) {
4866 fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
4867 abort();
4868 }
4869
4870 if (thread->shared->read_rate_limiter.get() != nullptr &&
4871 read % 256 == 255) {
4872 thread->shared->read_rate_limiter->Request(
4873 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4874 }
4875
4876 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
4877 }
4878
4879 char msg[100];
4880 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found,
4881 read);
4882
4883 thread->stats.AddBytes(bytes);
4884 thread->stats.AddMessage(msg);
4885
4886 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
4887 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4888 get_perf_context()->ToString());
4889 }
4890 }
4891
ReadReverse(ThreadState * thread)4892 void ReadReverse(ThreadState* thread) {
4893 if (db_.db != nullptr) {
4894 ReadReverse(thread, db_.db);
4895 } else {
4896 for (const auto& db_with_cfh : multi_dbs_) {
4897 ReadReverse(thread, db_with_cfh.db);
4898 }
4899 }
4900 }
4901
ReadReverse(ThreadState * thread,DB * db)4902 void ReadReverse(ThreadState* thread, DB* db) {
4903 Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
4904 int64_t i = 0;
4905 int64_t bytes = 0;
4906 for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
4907 bytes += iter->key().size() + iter->value().size();
4908 thread->stats.FinishedOps(nullptr, db, 1, kRead);
4909 ++i;
4910 if (thread->shared->read_rate_limiter.get() != nullptr &&
4911 i % 1024 == 1023) {
4912 thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
4913 nullptr /* stats */,
4914 RateLimiter::OpType::kRead);
4915 }
4916 }
4917 delete iter;
4918 thread->stats.AddBytes(bytes);
4919 }
4920
ReadRandomFast(ThreadState * thread)4921 void ReadRandomFast(ThreadState* thread) {
4922 int64_t read = 0;
4923 int64_t found = 0;
4924 int64_t nonexist = 0;
4925 ReadOptions options(FLAGS_verify_checksum, true);
4926 std::unique_ptr<const char[]> key_guard;
4927 Slice key = AllocateKey(&key_guard);
4928 std::string value;
4929 DB* db = SelectDBWithCfh(thread)->db;
4930
4931 int64_t pot = 1;
4932 while (pot < FLAGS_num) {
4933 pot <<= 1;
4934 }
4935
4936 Duration duration(FLAGS_duration, reads_);
4937 do {
4938 for (int i = 0; i < 100; ++i) {
4939 int64_t key_rand = thread->rand.Next() & (pot - 1);
4940 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
4941 ++read;
4942 auto status = db->Get(options, key, &value);
4943 if (status.ok()) {
4944 ++found;
4945 } else if (!status.IsNotFound()) {
4946 fprintf(stderr, "Get returned an error: %s\n",
4947 status.ToString().c_str());
4948 abort();
4949 }
4950 if (key_rand >= FLAGS_num) {
4951 ++nonexist;
4952 }
4953 }
4954 if (thread->shared->read_rate_limiter.get() != nullptr) {
4955 thread->shared->read_rate_limiter->Request(
4956 100, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4957 }
4958
4959 thread->stats.FinishedOps(nullptr, db, 100, kRead);
4960 } while (!duration.Done(100));
4961
4962 char msg[100];
4963 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
4964 "issued %" PRIu64 " non-exist keys)\n",
4965 found, read, nonexist);
4966
4967 thread->stats.AddMessage(msg);
4968
4969 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
4970 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4971 get_perf_context()->ToString());
4972 }
4973 }
4974
GetRandomKey(Random64 * rand)4975 int64_t GetRandomKey(Random64* rand) {
4976 uint64_t rand_int = rand->Next();
4977 int64_t key_rand;
4978 if (read_random_exp_range_ == 0) {
4979 key_rand = rand_int % FLAGS_num;
4980 } else {
4981 const uint64_t kBigInt = static_cast<uint64_t>(1U) << 62;
4982 long double order = -static_cast<long double>(rand_int % kBigInt) /
4983 static_cast<long double>(kBigInt) *
4984 read_random_exp_range_;
4985 long double exp_ran = std::exp(order);
4986 uint64_t rand_num =
4987 static_cast<int64_t>(exp_ran * static_cast<long double>(FLAGS_num));
4988 // Map to a different number to avoid locality.
4989 const uint64_t kBigPrime = 0x5bd1e995;
4990 // Overflow is like %(2^64). Will have little impact of results.
4991 key_rand = static_cast<int64_t>((rand_num * kBigPrime) % FLAGS_num);
4992 }
4993 return key_rand;
4994 }
4995
ReadRandom(ThreadState * thread)4996 void ReadRandom(ThreadState* thread) {
4997 int64_t read = 0;
4998 int64_t found = 0;
4999 int64_t bytes = 0;
5000 int num_keys = 0;
5001 int64_t key_rand = GetRandomKey(&thread->rand);
5002 ReadOptions options(FLAGS_verify_checksum, true);
5003 std::unique_ptr<const char[]> key_guard;
5004 Slice key = AllocateKey(&key_guard);
5005 PinnableSlice pinnable_val;
5006
5007 Duration duration(FLAGS_duration, reads_);
5008 while (!duration.Done(1)) {
5009 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
5010 // We use same key_rand as seed for key and column family so that we can
5011 // deterministically find the cfh corresponding to a particular key, as it
5012 // is done in DoWrite method.
5013 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
5014 if (entries_per_batch_ > 1 && FLAGS_multiread_stride) {
5015 if (++num_keys == entries_per_batch_) {
5016 num_keys = 0;
5017 key_rand = GetRandomKey(&thread->rand);
5018 if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
5019 FLAGS_num) {
5020 key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
5021 }
5022 } else {
5023 key_rand += FLAGS_multiread_stride;
5024 }
5025 } else {
5026 key_rand = GetRandomKey(&thread->rand);
5027 }
5028 read++;
5029 Status s;
5030 if (FLAGS_num_column_families > 1) {
5031 s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
5032 &pinnable_val);
5033 } else {
5034 pinnable_val.Reset();
5035 s = db_with_cfh->db->Get(options,
5036 db_with_cfh->db->DefaultColumnFamily(), key,
5037 &pinnable_val);
5038 }
5039 if (s.ok()) {
5040 found++;
5041 bytes += key.size() + pinnable_val.size();
5042 } else if (!s.IsNotFound()) {
5043 fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
5044 abort();
5045 }
5046
5047 if (thread->shared->read_rate_limiter.get() != nullptr &&
5048 read % 256 == 255) {
5049 thread->shared->read_rate_limiter->Request(
5050 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
5051 }
5052
5053 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
5054 }
5055
5056 char msg[100];
5057 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
5058 found, read);
5059
5060 thread->stats.AddBytes(bytes);
5061 thread->stats.AddMessage(msg);
5062
5063 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
5064 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
5065 get_perf_context()->ToString());
5066 }
5067 }
5068
5069 // Calls MultiGet over a list of keys from a random distribution.
5070 // Returns the total number of keys found.
MultiReadRandom(ThreadState * thread)5071 void MultiReadRandom(ThreadState* thread) {
5072 int64_t read = 0;
5073 int64_t num_multireads = 0;
5074 int64_t found = 0;
5075 ReadOptions options(FLAGS_verify_checksum, true);
5076 std::vector<Slice> keys;
5077 std::vector<std::unique_ptr<const char[]> > key_guards;
5078 std::vector<std::string> values(entries_per_batch_);
5079 PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_];
5080 std::unique_ptr<PinnableSlice[]> pin_values_guard(pin_values);
5081 std::vector<Status> stat_list(entries_per_batch_);
5082 while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
5083 key_guards.push_back(std::unique_ptr<const char[]>());
5084 keys.push_back(AllocateKey(&key_guards.back()));
5085 }
5086
5087 Duration duration(FLAGS_duration, reads_);
5088 while (!duration.Done(1)) {
5089 DB* db = SelectDB(thread);
5090 if (FLAGS_multiread_stride) {
5091 int64_t key = GetRandomKey(&thread->rand);
5092 if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
5093 static_cast<int64_t>(FLAGS_num)) {
5094 key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
5095 }
5096 for (int64_t i = 0; i < entries_per_batch_; ++i) {
5097 GenerateKeyFromInt(key, FLAGS_num, &keys[i]);
5098 key += FLAGS_multiread_stride;
5099 }
5100 } else {
5101 for (int64_t i = 0; i < entries_per_batch_; ++i) {
5102 GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
5103 }
5104 }
5105 if (!FLAGS_multiread_batched) {
5106 std::vector<Status> statuses = db->MultiGet(options, keys, &values);
5107 assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
5108
5109 read += entries_per_batch_;
5110 num_multireads++;
5111 for (int64_t i = 0; i < entries_per_batch_; ++i) {
5112 if (statuses[i].ok()) {
5113 ++found;
5114 } else if (!statuses[i].IsNotFound()) {
5115 fprintf(stderr, "MultiGet returned an error: %s\n",
5116 statuses[i].ToString().c_str());
5117 abort();
5118 }
5119 }
5120 } else {
5121 db->MultiGet(options, db->DefaultColumnFamily(), keys.size(),
5122 keys.data(), pin_values, stat_list.data());
5123
5124 read += entries_per_batch_;
5125 num_multireads++;
5126 for (int64_t i = 0; i < entries_per_batch_; ++i) {
5127 if (stat_list[i].ok()) {
5128 ++found;
5129 } else if (!stat_list[i].IsNotFound()) {
5130 fprintf(stderr, "MultiGet returned an error: %s\n",
5131 stat_list[i].ToString().c_str());
5132 abort();
5133 }
5134 stat_list[i] = Status::OK();
5135 pin_values[i].Reset();
5136 }
5137 }
5138 if (thread->shared->read_rate_limiter.get() != nullptr &&
5139 num_multireads % 256 == 255) {
5140 thread->shared->read_rate_limiter->Request(
5141 256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */,
5142 RateLimiter::OpType::kRead);
5143 }
5144 thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
5145 }
5146
5147 char msg[100];
5148 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
5149 found, read);
5150 thread->stats.AddMessage(msg);
5151 }
5152
5153 // The inverse function of Pareto distribution
ParetoCdfInversion(double u,double theta,double k,double sigma)5154 int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
5155 double ret;
5156 if (k == 0.0) {
5157 ret = theta - sigma * std::log(u);
5158 } else {
5159 ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
5160 }
5161 return static_cast<int64_t>(ceil(ret));
5162 }
5163 // The inverse function of power distribution (y=ax^b)
PowerCdfInversion(double u,double a,double b)5164 int64_t PowerCdfInversion(double u, double a, double b) {
5165 double ret;
5166 ret = std::pow((u / a), (1 / b));
5167 return static_cast<int64_t>(ceil(ret));
5168 }
5169
5170 // Add the noice to the QPS
AddNoise(double origin,double noise_ratio)5171 double AddNoise(double origin, double noise_ratio) {
5172 if (noise_ratio < 0.0 || noise_ratio > 1.0) {
5173 return origin;
5174 }
5175 int band_int = static_cast<int>(FLAGS_sine_a);
5176 double delta = (rand() % band_int - band_int / 2) * noise_ratio;
5177 if (origin + delta < 0) {
5178 return origin;
5179 } else {
5180 return (origin + delta);
5181 }
5182 }
5183
5184 // Decide the ratio of different query types
5185 // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
5186 class QueryDecider {
5187 public:
5188 std::vector<int> type_;
5189 std::vector<double> ratio_;
5190 int range_;
5191
QueryDecider()5192 QueryDecider() {}
~QueryDecider()5193 ~QueryDecider() {}
5194
Initiate(std::vector<double> ratio_input)5195 Status Initiate(std::vector<double> ratio_input) {
5196 int range_max = 1000;
5197 double sum = 0.0;
5198 for (auto& ratio : ratio_input) {
5199 sum += ratio;
5200 }
5201 range_ = 0;
5202 for (auto& ratio : ratio_input) {
5203 range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
5204 type_.push_back(range_);
5205 ratio_.push_back(ratio / sum);
5206 }
5207 return Status::OK();
5208 }
5209
GetType(int64_t rand_num)5210 int GetType(int64_t rand_num) {
5211 if (rand_num < 0) {
5212 rand_num = rand_num * (-1);
5213 }
5214 assert(range_ != 0);
5215 int pos = static_cast<int>(rand_num % range_);
5216 for (int i = 0; i < static_cast<int>(type_.size()); i++) {
5217 if (pos < type_[i]) {
5218 return i;
5219 }
5220 }
5221 return 0;
5222 }
5223 };
5224
5225 // KeyrangeUnit is the struct of a keyrange. It is used in a keyrange vector
5226 // to transfer a random value to one keyrange based on the hotness.
5227 struct KeyrangeUnit {
5228 int64_t keyrange_start;
5229 int64_t keyrange_access;
5230 int64_t keyrange_keys;
5231 };
5232
5233 // From our observations, the prefix hotness (key-range hotness) follows
5234 // the two-term-exponential distribution: f(x) = a*exp(b*x) + c*exp(d*x).
5235 // However, we cannot directly use the inverse function to decide a
5236 // key-range from a random distribution. To achieve it, we create a list of
5237 // KeyrangeUnit, each KeyrangeUnit occupies a range of integers whose size is
5238 // decided based on the hotness of the key-range. When a random value is
5239 // generated based on uniform distribution, we map it to the KeyrangeUnit Vec
5240 // and one KeyrangeUnit is selected. The probability of a KeyrangeUnit being
5241 // selected is the same as the hotness of this KeyrangeUnit. After that, the
5242 // key can be randomly allocated to the key-range of this KeyrangeUnit, or we
5243 // can based on the power distribution (y=ax^b) to generate the offset of
5244 // the key in the selected key-range. In this way, we generate the keyID
5245 // based on the hotness of the prefix and also the key hotness distribution.
5246 class GenerateTwoTermExpKeys {
5247 public:
5248 int64_t keyrange_rand_max_;
5249 int64_t keyrange_size_;
5250 int64_t keyrange_num_;
5251 bool initiated_;
5252 std::vector<KeyrangeUnit> keyrange_set_;
5253
GenerateTwoTermExpKeys()5254 GenerateTwoTermExpKeys() {
5255 keyrange_rand_max_ = FLAGS_num;
5256 initiated_ = false;
5257 }
5258
~GenerateTwoTermExpKeys()5259 ~GenerateTwoTermExpKeys() {}
5260
5261 // Initiate the KeyrangeUnit vector and calculate the size of each
5262 // KeyrangeUnit.
InitiateExpDistribution(int64_t total_keys,double prefix_a,double prefix_b,double prefix_c,double prefix_d)5263 Status InitiateExpDistribution(int64_t total_keys, double prefix_a,
5264 double prefix_b, double prefix_c,
5265 double prefix_d) {
5266 int64_t amplify = 0;
5267 int64_t keyrange_start = 0;
5268 initiated_ = true;
5269 if (FLAGS_keyrange_num <= 0) {
5270 keyrange_num_ = 1;
5271 } else {
5272 keyrange_num_ = FLAGS_keyrange_num;
5273 }
5274 keyrange_size_ = total_keys / keyrange_num_;
5275
5276 // Calculate the key-range shares size based on the input parameters
5277 for (int64_t pfx = keyrange_num_; pfx >= 1; pfx--) {
5278 // Step 1. Calculate the probability that this key range will be
5279 // accessed in a query. It is based on the two-term expoential
5280 // distribution
5281 double keyrange_p = prefix_a * std::exp(prefix_b * pfx) +
5282 prefix_c * std::exp(prefix_d * pfx);
5283 if (keyrange_p < std::pow(10.0, -16.0)) {
5284 keyrange_p = 0.0;
5285 }
5286 // Step 2. Calculate the amplify
5287 // In order to allocate a query to a key-range based on the random
5288 // number generated for this query, we need to extend the probability
5289 // of each key range from [0,1] to [0, amplify]. Amplify is calculated
5290 // by 1/(smallest key-range probability). In this way, we ensure that
5291 // all key-ranges are assigned with an Integer that >=0
5292 if (amplify == 0 && keyrange_p > 0) {
5293 amplify = static_cast<int64_t>(std::floor(1 / keyrange_p)) + 1;
5294 }
5295
5296 // Step 3. For each key-range, we calculate its position in the
5297 // [0, amplify] range, including the start, the size (keyrange_access)
5298 KeyrangeUnit p_unit;
5299 p_unit.keyrange_start = keyrange_start;
5300 if (0.0 >= keyrange_p) {
5301 p_unit.keyrange_access = 0;
5302 } else {
5303 p_unit.keyrange_access =
5304 static_cast<int64_t>(std::floor(amplify * keyrange_p));
5305 }
5306 p_unit.keyrange_keys = keyrange_size_;
5307 keyrange_set_.push_back(p_unit);
5308 keyrange_start += p_unit.keyrange_access;
5309 }
5310 keyrange_rand_max_ = keyrange_start;
5311
5312 // Step 4. Shuffle the key-ranges randomly
5313 // Since the access probability is calculated from small to large,
5314 // If we do not re-allocate them, hot key-ranges are always at the end
5315 // and cold key-ranges are at the begin of the key space. Therefore, the
5316 // key-ranges are shuffled and the rand seed is only decide by the
5317 // key-range hotness distribution. With the same distribution parameters
5318 // the shuffle results are the same.
5319 Random64 rand_loca(keyrange_rand_max_);
5320 for (int64_t i = 0; i < FLAGS_keyrange_num; i++) {
5321 int64_t pos = rand_loca.Next() % FLAGS_keyrange_num;
5322 assert(i >= 0 && i < static_cast<int64_t>(keyrange_set_.size()) &&
5323 pos >= 0 && pos < static_cast<int64_t>(keyrange_set_.size()));
5324 std::swap(keyrange_set_[i], keyrange_set_[pos]);
5325 }
5326
5327 // Step 5. Recalculate the prefix start postion after shuffling
5328 int64_t offset = 0;
5329 for (auto& p_unit : keyrange_set_) {
5330 p_unit.keyrange_start = offset;
5331 offset += p_unit.keyrange_access;
5332 }
5333
5334 return Status::OK();
5335 }
5336
5337 // Generate the Key ID according to the input ini_rand and key distribution
DistGetKeyID(int64_t ini_rand,double key_dist_a,double key_dist_b)5338 int64_t DistGetKeyID(int64_t ini_rand, double key_dist_a,
5339 double key_dist_b) {
5340 int64_t keyrange_rand = ini_rand % keyrange_rand_max_;
5341
5342 // Calculate and select one key-range that contains the new key
5343 int64_t start = 0, end = static_cast<int64_t>(keyrange_set_.size());
5344 while (start + 1 < end) {
5345 int64_t mid = start + (end - start) / 2;
5346 assert(mid >= 0 && mid < static_cast<int64_t>(keyrange_set_.size()));
5347 if (keyrange_rand < keyrange_set_[mid].keyrange_start) {
5348 end = mid;
5349 } else {
5350 start = mid;
5351 }
5352 }
5353 int64_t keyrange_id = start;
5354
5355 // Select one key in the key-range and compose the keyID
5356 int64_t key_offset = 0, key_seed;
5357 if (key_dist_a == 0.0 && key_dist_b == 0.0) {
5358 key_offset = ini_rand % keyrange_size_;
5359 } else {
5360 key_seed = static_cast<int64_t>(
5361 ceil(std::pow((ini_rand / key_dist_a), (1 / key_dist_b))));
5362 Random64 rand_key(key_seed);
5363 key_offset = static_cast<int64_t>(rand_key.Next()) % keyrange_size_;
5364 }
5365 return keyrange_size_ * keyrange_id + key_offset;
5366 }
5367 };
5368
5369 // The social graph wokrload mixed with Get, Put, Iterator queries.
5370 // The value size and iterator length follow Pareto distribution.
5371 // The overall key access follow power distribution. If user models the
5372 // workload based on different key-ranges (or different prefixes), user
5373 // can use two-term-exponential distribution to fit the workload. User
5374 // needs to decides the ratio between Get, Put, Iterator queries before
5375 // starting the benchmark.
MixGraph(ThreadState * thread)5376 void MixGraph(ThreadState* thread) {
5377 int64_t read = 0; // including single gets and Next of iterators
5378 int64_t gets = 0;
5379 int64_t puts = 0;
5380 int64_t found = 0;
5381 int64_t seek = 0;
5382 int64_t seek_found = 0;
5383 int64_t bytes = 0;
5384 const int64_t default_value_max = 1 * 1024 * 1024;
5385 int64_t value_max = default_value_max;
5386 int64_t scan_len_max = FLAGS_mix_max_scan_len;
5387 double write_rate = 1000000.0;
5388 double read_rate = 1000000.0;
5389 bool use_prefix_modeling = false;
5390 GenerateTwoTermExpKeys gen_exp;
5391 std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
5392 FLAGS_mix_seek_ratio};
5393 char value_buffer[default_value_max];
5394 QueryDecider query;
5395 RandomGenerator gen;
5396 Status s;
5397 if (value_max > FLAGS_mix_max_value_size) {
5398 value_max = FLAGS_mix_max_value_size;
5399 }
5400
5401 ReadOptions options(FLAGS_verify_checksum, true);
5402 std::unique_ptr<const char[]> key_guard;
5403 Slice key = AllocateKey(&key_guard);
5404 PinnableSlice pinnable_val;
5405 query.Initiate(ratio);
5406
5407 // the limit of qps initiation
5408 if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
5409 thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
5410 static_cast<int64_t>(read_rate), 100000 /* refill_period_us */, 10 /* fairness */,
5411 RateLimiter::Mode::kReadsOnly));
5412 thread->shared->write_rate_limiter.reset(
5413 NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
5414 }
5415
5416 // Decide if user wants to use prefix based key generation
5417 if (FLAGS_keyrange_dist_a != 0.0 || FLAGS_keyrange_dist_b != 0.0 ||
5418 FLAGS_keyrange_dist_c != 0.0 || FLAGS_keyrange_dist_d != 0.0) {
5419 use_prefix_modeling = true;
5420 gen_exp.InitiateExpDistribution(
5421 FLAGS_num, FLAGS_keyrange_dist_a, FLAGS_keyrange_dist_b,
5422 FLAGS_keyrange_dist_c, FLAGS_keyrange_dist_d);
5423 }
5424
5425 Duration duration(FLAGS_duration, reads_);
5426 while (!duration.Done(1)) {
5427 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
5428 int64_t ini_rand, rand_v, key_rand, key_seed;
5429 ini_rand = GetRandomKey(&thread->rand);
5430 rand_v = ini_rand % FLAGS_num;
5431 double u = static_cast<double>(rand_v) / FLAGS_num;
5432
5433 // Generate the keyID based on the key hotness and prefix hotness
5434 if (use_prefix_modeling) {
5435 key_rand =
5436 gen_exp.DistGetKeyID(ini_rand, FLAGS_key_dist_a, FLAGS_key_dist_b);
5437 } else {
5438 key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
5439 Random64 rand(key_seed);
5440 key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
5441 }
5442 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
5443 int query_type = query.GetType(rand_v);
5444
5445 // change the qps
5446 uint64_t now = FLAGS_env->NowMicros();
5447 uint64_t usecs_since_last;
5448 if (now > thread->stats.GetSineInterval()) {
5449 usecs_since_last = now - thread->stats.GetSineInterval();
5450 } else {
5451 usecs_since_last = 0;
5452 }
5453
5454 if (usecs_since_last >
5455 (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
5456 double usecs_since_start =
5457 static_cast<double>(now - thread->stats.GetStart());
5458 thread->stats.ResetSineInterval();
5459 double mix_rate_with_noise = AddNoise(
5460 SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
5461 read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
5462 write_rate =
5463 mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
5464
5465 thread->shared->write_rate_limiter.reset(
5466 NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
5467 thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
5468 static_cast<int64_t>(read_rate),
5469 FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
5470 RateLimiter::Mode::kReadsOnly));
5471 }
5472 // Start the query
5473 if (query_type == 0) {
5474 // the Get query
5475 gets++;
5476 read++;
5477 if (FLAGS_num_column_families > 1) {
5478 s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
5479 &pinnable_val);
5480 } else {
5481 pinnable_val.Reset();
5482 s = db_with_cfh->db->Get(options,
5483 db_with_cfh->db->DefaultColumnFamily(), key,
5484 &pinnable_val);
5485 }
5486
5487 if (s.ok()) {
5488 found++;
5489 bytes += key.size() + pinnable_val.size();
5490 } else if (!s.IsNotFound()) {
5491 fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
5492 abort();
5493 }
5494
5495 if (thread->shared->read_rate_limiter.get() != nullptr &&
5496 read % 256 == 255) {
5497 thread->shared->read_rate_limiter->Request(
5498 256, Env::IO_HIGH, nullptr /* stats */,
5499 RateLimiter::OpType::kRead);
5500 }
5501 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
5502 } else if (query_type == 1) {
5503 // the Put query
5504 puts++;
5505 int64_t val_size = ParetoCdfInversion(
5506 u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
5507 if (val_size < 0) {
5508 val_size = 10;
5509 } else if (val_size > value_max) {
5510 val_size = val_size % value_max;
5511 }
5512 s = db_with_cfh->db->Put(
5513 write_options_, key,
5514 gen.Generate(static_cast<unsigned int>(val_size)));
5515 if (!s.ok()) {
5516 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
5517 exit(1);
5518 }
5519
5520 if (thread->shared->write_rate_limiter) {
5521 thread->shared->write_rate_limiter->Request(
5522 key.size() + val_size, Env::IO_HIGH, nullptr /*stats*/,
5523 RateLimiter::OpType::kWrite);
5524 }
5525 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
5526 } else if (query_type == 2) {
5527 // Seek query
5528 if (db_with_cfh->db != nullptr) {
5529 Iterator* single_iter = nullptr;
5530 single_iter = db_with_cfh->db->NewIterator(options);
5531 if (single_iter != nullptr) {
5532 single_iter->Seek(key);
5533 seek++;
5534 read++;
5535 if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
5536 seek_found++;
5537 }
5538 int64_t scan_length =
5539 ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
5540 FLAGS_iter_sigma) %
5541 scan_len_max;
5542 for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
5543 Slice value = single_iter->value();
5544 memcpy(value_buffer, value.data(),
5545 std::min(value.size(), sizeof(value_buffer)));
5546 bytes += single_iter->key().size() + single_iter->value().size();
5547 single_iter->Next();
5548 assert(single_iter->status().ok());
5549 }
5550 }
5551 delete single_iter;
5552 }
5553 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
5554 }
5555 }
5556 char msg[256];
5557 snprintf(msg, sizeof(msg),
5558 "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
5559 " in %" PRIu64 " found)\n",
5560 gets, puts, seek, found, read);
5561
5562 thread->stats.AddBytes(bytes);
5563 thread->stats.AddMessage(msg);
5564
5565 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
5566 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
5567 get_perf_context()->ToString());
5568 }
5569 }
5570
IteratorCreation(ThreadState * thread)5571 void IteratorCreation(ThreadState* thread) {
5572 Duration duration(FLAGS_duration, reads_);
5573 ReadOptions options(FLAGS_verify_checksum, true);
5574 while (!duration.Done(1)) {
5575 DB* db = SelectDB(thread);
5576 Iterator* iter = db->NewIterator(options);
5577 delete iter;
5578 thread->stats.FinishedOps(nullptr, db, 1, kOthers);
5579 }
5580 }
5581
IteratorCreationWhileWriting(ThreadState * thread)5582 void IteratorCreationWhileWriting(ThreadState* thread) {
5583 if (thread->tid > 0) {
5584 IteratorCreation(thread);
5585 } else {
5586 BGWriter(thread, kWrite);
5587 }
5588 }
5589
SeekRandom(ThreadState * thread)5590 void SeekRandom(ThreadState* thread) {
5591 int64_t read = 0;
5592 int64_t found = 0;
5593 int64_t bytes = 0;
5594 ReadOptions options(FLAGS_verify_checksum, true);
5595 options.total_order_seek = FLAGS_total_order_seek;
5596 options.prefix_same_as_start = FLAGS_prefix_same_as_start;
5597 options.tailing = FLAGS_use_tailing_iterator;
5598 options.readahead_size = FLAGS_readahead_size;
5599
5600 Iterator* single_iter = nullptr;
5601 std::vector<Iterator*> multi_iters;
5602 if (db_.db != nullptr) {
5603 single_iter = db_.db->NewIterator(options);
5604 } else {
5605 for (const auto& db_with_cfh : multi_dbs_) {
5606 multi_iters.push_back(db_with_cfh.db->NewIterator(options));
5607 }
5608 }
5609
5610 std::unique_ptr<const char[]> key_guard;
5611 Slice key = AllocateKey(&key_guard);
5612
5613 std::unique_ptr<const char[]> upper_bound_key_guard;
5614 Slice upper_bound = AllocateKey(&upper_bound_key_guard);
5615 std::unique_ptr<const char[]> lower_bound_key_guard;
5616 Slice lower_bound = AllocateKey(&lower_bound_key_guard);
5617
5618 Duration duration(FLAGS_duration, reads_);
5619 char value_buffer[256];
5620 while (!duration.Done(1)) {
5621 int64_t seek_pos = thread->rand.Next() % FLAGS_num;
5622 GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
5623 &key);
5624 if (FLAGS_max_scan_distance != 0) {
5625 if (FLAGS_reverse_iterator) {
5626 GenerateKeyFromInt(
5627 static_cast<uint64_t>(std::max(
5628 static_cast<int64_t>(0), seek_pos - FLAGS_max_scan_distance)),
5629 FLAGS_num, &lower_bound);
5630 options.iterate_lower_bound = &lower_bound;
5631 } else {
5632 auto min_num =
5633 std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance);
5634 GenerateKeyFromInt(static_cast<uint64_t>(min_num), FLAGS_num,
5635 &upper_bound);
5636 options.iterate_upper_bound = &upper_bound;
5637 }
5638 }
5639
5640 if (!FLAGS_use_tailing_iterator) {
5641 if (db_.db != nullptr) {
5642 delete single_iter;
5643 single_iter = db_.db->NewIterator(options);
5644 } else {
5645 for (auto iter : multi_iters) {
5646 delete iter;
5647 }
5648 multi_iters.clear();
5649 for (const auto& db_with_cfh : multi_dbs_) {
5650 multi_iters.push_back(db_with_cfh.db->NewIterator(options));
5651 }
5652 }
5653 }
5654 // Pick a Iterator to use
5655 Iterator* iter_to_use = single_iter;
5656 if (single_iter == nullptr) {
5657 iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
5658 }
5659
5660 iter_to_use->Seek(key);
5661 read++;
5662 if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
5663 found++;
5664 }
5665
5666 for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
5667 // Copy out iterator's value to make sure we read them.
5668 Slice value = iter_to_use->value();
5669 memcpy(value_buffer, value.data(),
5670 std::min(value.size(), sizeof(value_buffer)));
5671 bytes += iter_to_use->key().size() + iter_to_use->value().size();
5672
5673 if (!FLAGS_reverse_iterator) {
5674 iter_to_use->Next();
5675 } else {
5676 iter_to_use->Prev();
5677 }
5678 assert(iter_to_use->status().ok());
5679 }
5680
5681 if (thread->shared->read_rate_limiter.get() != nullptr &&
5682 read % 256 == 255) {
5683 thread->shared->read_rate_limiter->Request(
5684 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
5685 }
5686
5687 thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
5688 }
5689 delete single_iter;
5690 for (auto iter : multi_iters) {
5691 delete iter;
5692 }
5693
5694 char msg[100];
5695 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
5696 found, read);
5697 thread->stats.AddBytes(bytes);
5698 thread->stats.AddMessage(msg);
5699 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
5700 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
5701 get_perf_context()->ToString());
5702 }
5703 }
5704
SeekRandomWhileWriting(ThreadState * thread)5705 void SeekRandomWhileWriting(ThreadState* thread) {
5706 if (thread->tid > 0) {
5707 SeekRandom(thread);
5708 } else {
5709 BGWriter(thread, kWrite);
5710 }
5711 }
5712
SeekRandomWhileMerging(ThreadState * thread)5713 void SeekRandomWhileMerging(ThreadState* thread) {
5714 if (thread->tid > 0) {
5715 SeekRandom(thread);
5716 } else {
5717 BGWriter(thread, kMerge);
5718 }
5719 }
5720
DoDelete(ThreadState * thread,bool seq)5721 void DoDelete(ThreadState* thread, bool seq) {
5722 WriteBatch batch;
5723 Duration duration(seq ? 0 : FLAGS_duration, deletes_);
5724 int64_t i = 0;
5725 std::unique_ptr<const char[]> key_guard;
5726 Slice key = AllocateKey(&key_guard);
5727
5728 while (!duration.Done(entries_per_batch_)) {
5729 DB* db = SelectDB(thread);
5730 batch.Clear();
5731 for (int64_t j = 0; j < entries_per_batch_; ++j) {
5732 const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
5733 GenerateKeyFromInt(k, FLAGS_num, &key);
5734 batch.Delete(key);
5735 }
5736 auto s = db->Write(write_options_, &batch);
5737 thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
5738 if (!s.ok()) {
5739 fprintf(stderr, "del error: %s\n", s.ToString().c_str());
5740 exit(1);
5741 }
5742 i += entries_per_batch_;
5743 }
5744 }
5745
DeleteSeq(ThreadState * thread)5746 void DeleteSeq(ThreadState* thread) {
5747 DoDelete(thread, true);
5748 }
5749
DeleteRandom(ThreadState * thread)5750 void DeleteRandom(ThreadState* thread) {
5751 DoDelete(thread, false);
5752 }
5753
ReadWhileWriting(ThreadState * thread)5754 void ReadWhileWriting(ThreadState* thread) {
5755 if (thread->tid > 0) {
5756 ReadRandom(thread);
5757 } else {
5758 BGWriter(thread, kWrite);
5759 }
5760 }
5761
ReadWhileMerging(ThreadState * thread)5762 void ReadWhileMerging(ThreadState* thread) {
5763 if (thread->tid > 0) {
5764 ReadRandom(thread);
5765 } else {
5766 BGWriter(thread, kMerge);
5767 }
5768 }
5769
BGWriter(ThreadState * thread,enum OperationType write_merge)5770 void BGWriter(ThreadState* thread, enum OperationType write_merge) {
5771 // Special thread that keeps writing until other threads are done.
5772 RandomGenerator gen;
5773 int64_t bytes = 0;
5774
5775 std::unique_ptr<RateLimiter> write_rate_limiter;
5776 if (FLAGS_benchmark_write_rate_limit > 0) {
5777 write_rate_limiter.reset(
5778 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
5779 }
5780
5781 // Don't merge stats from this thread with the readers.
5782 thread->stats.SetExcludeFromMerge();
5783
5784 std::unique_ptr<const char[]> key_guard;
5785 Slice key = AllocateKey(&key_guard);
5786 uint32_t written = 0;
5787 bool hint_printed = false;
5788
5789 while (true) {
5790 DB* db = SelectDB(thread);
5791 {
5792 MutexLock l(&thread->shared->mu);
5793 if (FLAGS_finish_after_writes && written == writes_) {
5794 fprintf(stderr, "Exiting the writer after %u writes...\n", written);
5795 break;
5796 }
5797 if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
5798 // Other threads have finished
5799 if (FLAGS_finish_after_writes) {
5800 // Wait for the writes to be finished
5801 if (!hint_printed) {
5802 fprintf(stderr, "Reads are finished. Have %d more writes to do\n",
5803 static_cast<int>(writes_) - written);
5804 hint_printed = true;
5805 }
5806 } else {
5807 // Finish the write immediately
5808 break;
5809 }
5810 }
5811 }
5812
5813 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5814 Status s;
5815
5816 Slice val = gen.Generate();
5817 if (write_merge == kWrite) {
5818 s = db->Put(write_options_, key, val);
5819 } else {
5820 s = db->Merge(write_options_, key, val);
5821 }
5822 written++;
5823
5824 if (!s.ok()) {
5825 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
5826 exit(1);
5827 }
5828 bytes += key.size() + val.size();
5829 thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
5830
5831 if (FLAGS_benchmark_write_rate_limit > 0) {
5832 write_rate_limiter->Request(
5833 key.size() + val.size(), Env::IO_HIGH,
5834 nullptr /* stats */, RateLimiter::OpType::kWrite);
5835 }
5836 }
5837 thread->stats.AddBytes(bytes);
5838 }
5839
ReadWhileScanning(ThreadState * thread)5840 void ReadWhileScanning(ThreadState* thread) {
5841 if (thread->tid > 0) {
5842 ReadRandom(thread);
5843 } else {
5844 BGScan(thread);
5845 }
5846 }
5847
BGScan(ThreadState * thread)5848 void BGScan(ThreadState* thread) {
5849 if (FLAGS_num_multi_db > 0) {
5850 fprintf(stderr, "Not supporting multiple DBs.\n");
5851 abort();
5852 }
5853 assert(db_.db != nullptr);
5854 ReadOptions read_options;
5855 Iterator* iter = db_.db->NewIterator(read_options);
5856
5857 fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_);
5858 Duration duration(FLAGS_duration, reads_);
5859 uint64_t num_seek_to_first = 0;
5860 uint64_t num_next = 0;
5861 while (!duration.Done(1)) {
5862 if (!iter->Valid()) {
5863 iter->SeekToFirst();
5864 num_seek_to_first++;
5865 } else if (!iter->status().ok()) {
5866 fprintf(stderr, "Iterator error: %s\n",
5867 iter->status().ToString().c_str());
5868 abort();
5869 } else {
5870 iter->Next();
5871 num_next++;
5872 }
5873
5874 thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
5875 }
5876 delete iter;
5877 }
5878
5879 // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
5880 // in DB atomically i.e in a single batch. Also refer GetMany.
PutMany(DB * db,const WriteOptions & writeoptions,const Slice & key,const Slice & value)5881 Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
5882 const Slice& value) {
5883 std::string suffixes[3] = {"2", "1", "0"};
5884 std::string keys[3];
5885
5886 WriteBatch batch;
5887 Status s;
5888 for (int i = 0; i < 3; i++) {
5889 keys[i] = key.ToString() + suffixes[i];
5890 batch.Put(keys[i], value);
5891 }
5892
5893 s = db->Write(writeoptions, &batch);
5894 return s;
5895 }
5896
5897
5898 // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
5899 // in DB atomically i.e in a single batch. Also refer GetMany.
DeleteMany(DB * db,const WriteOptions & writeoptions,const Slice & key)5900 Status DeleteMany(DB* db, const WriteOptions& writeoptions,
5901 const Slice& key) {
5902 std::string suffixes[3] = {"1", "2", "0"};
5903 std::string keys[3];
5904
5905 WriteBatch batch;
5906 Status s;
5907 for (int i = 0; i < 3; i++) {
5908 keys[i] = key.ToString() + suffixes[i];
5909 batch.Delete(keys[i]);
5910 }
5911
5912 s = db->Write(writeoptions, &batch);
5913 return s;
5914 }
5915
5916 // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
5917 // in the same snapshot, and verifies that all the values are identical.
5918 // ASSUMES that PutMany was used to put (K, V) into the DB.
GetMany(DB * db,const ReadOptions & readoptions,const Slice & key,std::string * value)5919 Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
5920 std::string* value) {
5921 std::string suffixes[3] = {"0", "1", "2"};
5922 std::string keys[3];
5923 Slice key_slices[3];
5924 std::string values[3];
5925 ReadOptions readoptionscopy = readoptions;
5926 readoptionscopy.snapshot = db->GetSnapshot();
5927 Status s;
5928 for (int i = 0; i < 3; i++) {
5929 keys[i] = key.ToString() + suffixes[i];
5930 key_slices[i] = keys[i];
5931 s = db->Get(readoptionscopy, key_slices[i], value);
5932 if (!s.ok() && !s.IsNotFound()) {
5933 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
5934 values[i] = "";
5935 // we continue after error rather than exiting so that we can
5936 // find more errors if any
5937 } else if (s.IsNotFound()) {
5938 values[i] = "";
5939 } else {
5940 values[i] = *value;
5941 }
5942 }
5943 db->ReleaseSnapshot(readoptionscopy.snapshot);
5944
5945 if ((values[0] != values[1]) || (values[1] != values[2])) {
5946 fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
5947 key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
5948 values[2].c_str());
5949 // we continue after error rather than exiting so that we can
5950 // find more errors if any
5951 }
5952
5953 return s;
5954 }
5955
5956 // Differs from readrandomwriterandom in the following ways:
5957 // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
5958 // (b) Does deletes as well (per FLAGS_deletepercent)
5959 // (c) In order to achieve high % of 'found' during lookups, and to do
5960 // multiple writes (including puts and deletes) it uses upto
5961 // FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
5962 // (d) Does not have a MultiGet option.
RandomWithVerify(ThreadState * thread)5963 void RandomWithVerify(ThreadState* thread) {
5964 ReadOptions options(FLAGS_verify_checksum, true);
5965 RandomGenerator gen;
5966 std::string value;
5967 int64_t found = 0;
5968 int get_weight = 0;
5969 int put_weight = 0;
5970 int delete_weight = 0;
5971 int64_t gets_done = 0;
5972 int64_t puts_done = 0;
5973 int64_t deletes_done = 0;
5974
5975 std::unique_ptr<const char[]> key_guard;
5976 Slice key = AllocateKey(&key_guard);
5977
5978 // the number of iterations is the larger of read_ or write_
5979 for (int64_t i = 0; i < readwrites_; i++) {
5980 DB* db = SelectDB(thread);
5981 if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
5982 // one batch completed, reinitialize for next batch
5983 get_weight = FLAGS_readwritepercent;
5984 delete_weight = FLAGS_deletepercent;
5985 put_weight = 100 - get_weight - delete_weight;
5986 }
5987 GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
5988 FLAGS_numdistinct, &key);
5989 if (get_weight > 0) {
5990 // do all the gets first
5991 Status s = GetMany(db, options, key, &value);
5992 if (!s.ok() && !s.IsNotFound()) {
5993 fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
5994 // we continue after error rather than exiting so that we can
5995 // find more errors if any
5996 } else if (!s.IsNotFound()) {
5997 found++;
5998 }
5999 get_weight--;
6000 gets_done++;
6001 thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
6002 } else if (put_weight > 0) {
6003 // then do all the corresponding number of puts
6004 // for all the gets we have done earlier
6005 Status s = PutMany(db, write_options_, key, gen.Generate());
6006 if (!s.ok()) {
6007 fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
6008 exit(1);
6009 }
6010 put_weight--;
6011 puts_done++;
6012 thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
6013 } else if (delete_weight > 0) {
6014 Status s = DeleteMany(db, write_options_, key);
6015 if (!s.ok()) {
6016 fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
6017 exit(1);
6018 }
6019 delete_weight--;
6020 deletes_done++;
6021 thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
6022 }
6023 }
6024 char msg[128];
6025 snprintf(msg, sizeof(msg),
6026 "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
6027 PRIu64 " found:%" PRIu64 ")",
6028 gets_done, puts_done, deletes_done, readwrites_, found);
6029 thread->stats.AddMessage(msg);
6030 }
6031
6032 // This is different from ReadWhileWriting because it does not use
6033 // an extra thread.
ReadRandomWriteRandom(ThreadState * thread)6034 void ReadRandomWriteRandom(ThreadState* thread) {
6035 ReadOptions options(FLAGS_verify_checksum, true);
6036 RandomGenerator gen;
6037 std::string value;
6038 int64_t found = 0;
6039 int get_weight = 0;
6040 int put_weight = 0;
6041 int64_t reads_done = 0;
6042 int64_t writes_done = 0;
6043 Duration duration(FLAGS_duration, readwrites_);
6044
6045 std::unique_ptr<const char[]> key_guard;
6046 Slice key = AllocateKey(&key_guard);
6047
6048 // the number of iterations is the larger of read_ or write_
6049 while (!duration.Done(1)) {
6050 DB* db = SelectDB(thread);
6051 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
6052 if (get_weight == 0 && put_weight == 0) {
6053 // one batch completed, reinitialize for next batch
6054 get_weight = FLAGS_readwritepercent;
6055 put_weight = 100 - get_weight;
6056 }
6057 if (get_weight > 0) {
6058 // do all the gets first
6059 Status s = db->Get(options, key, &value);
6060 if (!s.ok() && !s.IsNotFound()) {
6061 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
6062 // we continue after error rather than exiting so that we can
6063 // find more errors if any
6064 } else if (!s.IsNotFound()) {
6065 found++;
6066 }
6067 get_weight--;
6068 reads_done++;
6069 thread->stats.FinishedOps(nullptr, db, 1, kRead);
6070 } else if (put_weight > 0) {
6071 // then do all the corresponding number of puts
6072 // for all the gets we have done earlier
6073 Status s = db->Put(write_options_, key, gen.Generate());
6074 if (!s.ok()) {
6075 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
6076 exit(1);
6077 }
6078 put_weight--;
6079 writes_done++;
6080 thread->stats.FinishedOps(nullptr, db, 1, kWrite);
6081 }
6082 }
6083 char msg[100];
6084 snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
6085 " total:%" PRIu64 " found:%" PRIu64 ")",
6086 reads_done, writes_done, readwrites_, found);
6087 thread->stats.AddMessage(msg);
6088 }
6089
6090 //
6091 // Read-modify-write for random keys
UpdateRandom(ThreadState * thread)6092 void UpdateRandom(ThreadState* thread) {
6093 ReadOptions options(FLAGS_verify_checksum, true);
6094 RandomGenerator gen;
6095 std::string value;
6096 int64_t found = 0;
6097 int64_t bytes = 0;
6098 Duration duration(FLAGS_duration, readwrites_);
6099
6100 std::unique_ptr<const char[]> key_guard;
6101 Slice key = AllocateKey(&key_guard);
6102 // the number of iterations is the larger of read_ or write_
6103 while (!duration.Done(1)) {
6104 DB* db = SelectDB(thread);
6105 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
6106
6107 auto status = db->Get(options, key, &value);
6108 if (status.ok()) {
6109 ++found;
6110 bytes += key.size() + value.size();
6111 } else if (!status.IsNotFound()) {
6112 fprintf(stderr, "Get returned an error: %s\n",
6113 status.ToString().c_str());
6114 abort();
6115 }
6116
6117 if (thread->shared->write_rate_limiter) {
6118 thread->shared->write_rate_limiter->Request(
6119 key.size() + value.size(), Env::IO_HIGH, nullptr /*stats*/,
6120 RateLimiter::OpType::kWrite);
6121 }
6122
6123 Slice val = gen.Generate();
6124 Status s = db->Put(write_options_, key, val);
6125 if (!s.ok()) {
6126 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
6127 exit(1);
6128 }
6129 bytes += key.size() + val.size();
6130 thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
6131 }
6132 char msg[100];
6133 snprintf(msg, sizeof(msg),
6134 "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
6135 thread->stats.AddBytes(bytes);
6136 thread->stats.AddMessage(msg);
6137 }
6138
6139 // Read-XOR-write for random keys. Xors the existing value with a randomly
6140 // generated value, and stores the result. Assuming A in the array of bytes
6141 // representing the existing value, we generate an array B of the same size,
6142 // then compute C = A^B as C[i]=A[i]^B[i], and store C
XORUpdateRandom(ThreadState * thread)6143 void XORUpdateRandom(ThreadState* thread) {
6144 ReadOptions options(FLAGS_verify_checksum, true);
6145 RandomGenerator gen;
6146 std::string existing_value;
6147 int64_t found = 0;
6148 Duration duration(FLAGS_duration, readwrites_);
6149
6150 BytesXOROperator xor_operator;
6151
6152 std::unique_ptr<const char[]> key_guard;
6153 Slice key = AllocateKey(&key_guard);
6154 // the number of iterations is the larger of read_ or write_
6155 while (!duration.Done(1)) {
6156 DB* db = SelectDB(thread);
6157 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
6158
6159 auto status = db->Get(options, key, &existing_value);
6160 if (status.ok()) {
6161 ++found;
6162 } else if (!status.IsNotFound()) {
6163 fprintf(stderr, "Get returned an error: %s\n",
6164 status.ToString().c_str());
6165 exit(1);
6166 }
6167
6168 Slice value = gen.Generate(static_cast<unsigned int>(existing_value.size()));
6169 std::string new_value;
6170
6171 if (status.ok()) {
6172 Slice existing_value_slice = Slice(existing_value);
6173 xor_operator.XOR(&existing_value_slice, value, &new_value);
6174 } else {
6175 xor_operator.XOR(nullptr, value, &new_value);
6176 }
6177
6178 Status s = db->Put(write_options_, key, Slice(new_value));
6179 if (!s.ok()) {
6180 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
6181 exit(1);
6182 }
6183 thread->stats.FinishedOps(nullptr, db, 1);
6184 }
6185 char msg[100];
6186 snprintf(msg, sizeof(msg),
6187 "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
6188 thread->stats.AddMessage(msg);
6189 }
6190
6191 // Read-modify-write for random keys.
6192 // Each operation causes the key grow by value_size (simulating an append).
6193 // Generally used for benchmarking against merges of similar type
AppendRandom(ThreadState * thread)6194 void AppendRandom(ThreadState* thread) {
6195 ReadOptions options(FLAGS_verify_checksum, true);
6196 RandomGenerator gen;
6197 std::string value;
6198 int64_t found = 0;
6199 int64_t bytes = 0;
6200
6201 std::unique_ptr<const char[]> key_guard;
6202 Slice key = AllocateKey(&key_guard);
6203 // The number of iterations is the larger of read_ or write_
6204 Duration duration(FLAGS_duration, readwrites_);
6205 while (!duration.Done(1)) {
6206 DB* db = SelectDB(thread);
6207 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
6208
6209 auto status = db->Get(options, key, &value);
6210 if (status.ok()) {
6211 ++found;
6212 bytes += key.size() + value.size();
6213 } else if (!status.IsNotFound()) {
6214 fprintf(stderr, "Get returned an error: %s\n",
6215 status.ToString().c_str());
6216 abort();
6217 } else {
6218 // If not existing, then just assume an empty string of data
6219 value.clear();
6220 }
6221
6222 // Update the value (by appending data)
6223 Slice operand = gen.Generate();
6224 if (value.size() > 0) {
6225 // Use a delimiter to match the semantics for StringAppendOperator
6226 value.append(1,',');
6227 }
6228 value.append(operand.data(), operand.size());
6229
6230 // Write back to the database
6231 Status s = db->Put(write_options_, key, value);
6232 if (!s.ok()) {
6233 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
6234 exit(1);
6235 }
6236 bytes += key.size() + value.size();
6237 thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
6238 }
6239
6240 char msg[100];
6241 snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
6242 readwrites_, found);
6243 thread->stats.AddBytes(bytes);
6244 thread->stats.AddMessage(msg);
6245 }
6246
6247 // Read-modify-write for random keys (using MergeOperator)
6248 // The merge operator to use should be defined by FLAGS_merge_operator
6249 // Adjust FLAGS_value_size so that the keys are reasonable for this operator
6250 // Assumes that the merge operator is non-null (i.e.: is well-defined)
6251 //
6252 // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
6253 // to simulate random additions over 64-bit integers using merge.
6254 //
6255 // The number of merges on the same key can be controlled by adjusting
6256 // FLAGS_merge_keys.
MergeRandom(ThreadState * thread)6257 void MergeRandom(ThreadState* thread) {
6258 RandomGenerator gen;
6259 int64_t bytes = 0;
6260 std::unique_ptr<const char[]> key_guard;
6261 Slice key = AllocateKey(&key_guard);
6262 // The number of iterations is the larger of read_ or write_
6263 Duration duration(FLAGS_duration, readwrites_);
6264 while (!duration.Done(1)) {
6265 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
6266 int64_t key_rand = thread->rand.Next() % merge_keys_;
6267 GenerateKeyFromInt(key_rand, merge_keys_, &key);
6268
6269 Status s;
6270 Slice val = gen.Generate();
6271 if (FLAGS_num_column_families > 1) {
6272 s = db_with_cfh->db->Merge(write_options_,
6273 db_with_cfh->GetCfh(key_rand), key,
6274 val);
6275 } else {
6276 s = db_with_cfh->db->Merge(write_options_,
6277 db_with_cfh->db->DefaultColumnFamily(), key,
6278 val);
6279 }
6280
6281 if (!s.ok()) {
6282 fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
6283 exit(1);
6284 }
6285 bytes += key.size() + val.size();
6286 thread->stats.FinishedOps(nullptr, db_with_cfh->db, 1, kMerge);
6287 }
6288
6289 // Print some statistics
6290 char msg[100];
6291 snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
6292 thread->stats.AddBytes(bytes);
6293 thread->stats.AddMessage(msg);
6294 }
6295
6296 // Read and merge random keys. The amount of reads and merges are controlled
6297 // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
6298 // keys (and thus also the number of reads and merges on the same key) can be
6299 // adjusted with FLAGS_merge_keys.
6300 //
6301 // As with MergeRandom, the merge operator to use should be defined by
6302 // FLAGS_merge_operator.
ReadRandomMergeRandom(ThreadState * thread)6303 void ReadRandomMergeRandom(ThreadState* thread) {
6304 ReadOptions options(FLAGS_verify_checksum, true);
6305 RandomGenerator gen;
6306 std::string value;
6307 int64_t num_hits = 0;
6308 int64_t num_gets = 0;
6309 int64_t num_merges = 0;
6310 size_t max_length = 0;
6311
6312 std::unique_ptr<const char[]> key_guard;
6313 Slice key = AllocateKey(&key_guard);
6314 // the number of iterations is the larger of read_ or write_
6315 Duration duration(FLAGS_duration, readwrites_);
6316 while (!duration.Done(1)) {
6317 DB* db = SelectDB(thread);
6318 GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
6319
6320 bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
6321
6322 if (do_merge) {
6323 Status s = db->Merge(write_options_, key, gen.Generate());
6324 if (!s.ok()) {
6325 fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
6326 exit(1);
6327 }
6328 num_merges++;
6329 thread->stats.FinishedOps(nullptr, db, 1, kMerge);
6330 } else {
6331 Status s = db->Get(options, key, &value);
6332 if (value.length() > max_length)
6333 max_length = value.length();
6334
6335 if (!s.ok() && !s.IsNotFound()) {
6336 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
6337 // we continue after error rather than exiting so that we can
6338 // find more errors if any
6339 } else if (!s.IsNotFound()) {
6340 num_hits++;
6341 }
6342 num_gets++;
6343 thread->stats.FinishedOps(nullptr, db, 1, kRead);
6344 }
6345 }
6346
6347 char msg[100];
6348 snprintf(msg, sizeof(msg),
6349 "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64
6350 " hits:%" PRIu64 " maxlength:%" ROCKSDB_PRIszt ")",
6351 num_gets, num_merges, readwrites_, num_hits, max_length);
6352 thread->stats.AddMessage(msg);
6353 }
6354
WriteSeqSeekSeq(ThreadState * thread)6355 void WriteSeqSeekSeq(ThreadState* thread) {
6356 writes_ = FLAGS_num;
6357 DoWrite(thread, SEQUENTIAL);
6358 // exclude writes from the ops/sec calculation
6359 thread->stats.Start(thread->tid);
6360
6361 DB* db = SelectDB(thread);
6362 std::unique_ptr<Iterator> iter(
6363 db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));
6364
6365 std::unique_ptr<const char[]> key_guard;
6366 Slice key = AllocateKey(&key_guard);
6367 for (int64_t i = 0; i < FLAGS_num; ++i) {
6368 GenerateKeyFromInt(i, FLAGS_num, &key);
6369 iter->Seek(key);
6370 assert(iter->Valid() && iter->key() == key);
6371 thread->stats.FinishedOps(nullptr, db, 1, kSeek);
6372
6373 for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
6374 if (!FLAGS_reverse_iterator) {
6375 iter->Next();
6376 } else {
6377 iter->Prev();
6378 }
6379 GenerateKeyFromInt(++i, FLAGS_num, &key);
6380 assert(iter->Valid() && iter->key() == key);
6381 thread->stats.FinishedOps(nullptr, db, 1, kSeek);
6382 }
6383
6384 iter->Seek(key);
6385 assert(iter->Valid() && iter->key() == key);
6386 thread->stats.FinishedOps(nullptr, db, 1, kSeek);
6387 }
6388 }
6389
binary_search(std::vector<int> & data,int start,int end,int key)6390 bool binary_search(std::vector<int>& data, int start, int end, int key) {
6391 if (data.empty()) return false;
6392 if (start > end) return false;
6393 int mid = start + (end - start) / 2;
6394 if (mid > static_cast<int>(data.size()) - 1) return false;
6395 if (data[mid] == key) {
6396 return true;
6397 } else if (data[mid] > key) {
6398 return binary_search(data, start, mid - 1, key);
6399 } else {
6400 return binary_search(data, mid + 1, end, key);
6401 }
6402 }
6403
6404 // Does a bunch of merge operations for a key(key1) where the merge operand
6405 // is a sorted list. Next performance comparison is done between doing a Get
6406 // for key1 followed by searching for another key(key2) in the large sorted
6407 // list vs calling GetMergeOperands for key1 and then searching for the key2
6408 // in all the sorted sub-lists. Later case is expected to be a lot faster.
GetMergeOperands(ThreadState * thread)6409 void GetMergeOperands(ThreadState* thread) {
6410 DB* db = SelectDB(thread);
6411 const int kTotalValues = 100000;
6412 const int kListSize = 100;
6413 std::string key = "my_key";
6414 std::string value;
6415
6416 for (int i = 1; i < kTotalValues; i++) {
6417 if (i % kListSize == 0) {
6418 // Remove trailing ','
6419 value.pop_back();
6420 db->Merge(WriteOptions(), key, value);
6421 value.clear();
6422 } else {
6423 value.append(std::to_string(i)).append(",");
6424 }
6425 }
6426
6427 SortList s;
6428 std::vector<int> data;
6429 // This value can be experimented with and it will demonstrate the
6430 // perf difference between doing a Get and searching for lookup_key in the
6431 // resultant large sorted list vs doing GetMergeOperands and searching
6432 // for lookup_key within this resultant sorted sub-lists.
6433 int lookup_key = 1;
6434
6435 // Get API call
6436 std::cout << "--- Get API call --- \n";
6437 PinnableSlice p_slice;
6438 uint64_t st = FLAGS_env->NowNanos();
6439 db->Get(ReadOptions(), db->DefaultColumnFamily(), key, &p_slice);
6440 s.MakeVector(data, p_slice);
6441 bool found =
6442 binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
6443 std::cout << "Found key? " << std::to_string(found) << "\n";
6444 uint64_t sp = FLAGS_env->NowNanos();
6445 std::cout << "Get: " << (sp - st) / 1000000000.0 << " seconds\n";
6446 std::string* dat_ = p_slice.GetSelf();
6447 std::cout << "Sample data from Get API call: " << dat_->substr(0, 10)
6448 << "\n";
6449 data.clear();
6450
6451 // GetMergeOperands API call
6452 std::cout << "--- GetMergeOperands API --- \n";
6453 std::vector<PinnableSlice> a_slice((kTotalValues / kListSize) + 1);
6454 st = FLAGS_env->NowNanos();
6455 int number_of_operands = 0;
6456 GetMergeOperandsOptions get_merge_operands_options;
6457 get_merge_operands_options.expected_max_number_of_operands =
6458 (kTotalValues / 100) + 1;
6459 db->GetMergeOperands(ReadOptions(), db->DefaultColumnFamily(), key,
6460 a_slice.data(), &get_merge_operands_options,
6461 &number_of_operands);
6462 for (PinnableSlice& psl : a_slice) {
6463 s.MakeVector(data, psl);
6464 found =
6465 binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
6466 data.clear();
6467 if (found) break;
6468 }
6469 std::cout << "Found key? " << std::to_string(found) << "\n";
6470 sp = FLAGS_env->NowNanos();
6471 std::cout << "Get Merge operands: " << (sp - st) / 1000000000.0
6472 << " seconds \n";
6473 int to_print = 0;
6474 std::cout << "Sample data from GetMergeOperands API call: ";
6475 for (PinnableSlice& psl : a_slice) {
6476 std::cout << "List: " << to_print << " : " << *psl.GetSelf() << "\n";
6477 if (to_print++ > 2) break;
6478 }
6479 }
6480
6481 #ifndef ROCKSDB_LITE
6482 // This benchmark stress tests Transactions. For a given --duration (or
6483 // total number of --writes, a Transaction will perform a read-modify-write
6484 // to increment the value of a key in each of N(--transaction-sets) sets of
6485 // keys (where each set has --num keys). If --threads is set, this will be
6486 // done in parallel.
6487 //
6488 // To test transactions, use --transaction_db=true. Not setting this
6489 // parameter
6490 // will run the same benchmark without transactions.
6491 //
6492 // RandomTransactionVerify() will then validate the correctness of the results
6493 // by checking if the sum of all keys in each set is the same.
RandomTransaction(ThreadState * thread)6494 void RandomTransaction(ThreadState* thread) {
6495 ReadOptions options(FLAGS_verify_checksum, true);
6496 Duration duration(FLAGS_duration, readwrites_);
6497 ReadOptions read_options(FLAGS_verify_checksum, true);
6498 uint16_t num_prefix_ranges = static_cast<uint16_t>(FLAGS_transaction_sets);
6499 uint64_t transactions_done = 0;
6500
6501 if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
6502 fprintf(stderr, "invalid value for transaction_sets\n");
6503 abort();
6504 }
6505
6506 TransactionOptions txn_options;
6507 txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
6508 txn_options.set_snapshot = FLAGS_transaction_set_snapshot;
6509
6510 RandomTransactionInserter inserter(&thread->rand, write_options_,
6511 read_options, FLAGS_num,
6512 num_prefix_ranges);
6513
6514 if (FLAGS_num_multi_db > 1) {
6515 fprintf(stderr,
6516 "Cannot run RandomTransaction benchmark with "
6517 "FLAGS_multi_db > 1.");
6518 abort();
6519 }
6520
6521 while (!duration.Done(1)) {
6522 bool success;
6523
6524 // RandomTransactionInserter will attempt to insert a key for each
6525 // # of FLAGS_transaction_sets
6526 if (FLAGS_optimistic_transaction_db) {
6527 success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
6528 } else if (FLAGS_transaction_db) {
6529 TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);
6530 success = inserter.TransactionDBInsert(txn_db, txn_options);
6531 } else {
6532 success = inserter.DBInsert(db_.db);
6533 }
6534
6535 if (!success) {
6536 fprintf(stderr, "Unexpected error: %s\n",
6537 inserter.GetLastStatus().ToString().c_str());
6538 abort();
6539 }
6540
6541 thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
6542 transactions_done++;
6543 }
6544
6545 char msg[100];
6546 if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
6547 snprintf(msg, sizeof(msg),
6548 "( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
6549 transactions_done, inserter.GetFailureCount());
6550 } else {
6551 snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
6552 }
6553 thread->stats.AddMessage(msg);
6554
6555 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
6556 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
6557 get_perf_context()->ToString());
6558 }
6559 thread->stats.AddBytes(static_cast<int64_t>(inserter.GetBytesInserted()));
6560 }
6561
6562 // Verifies consistency of data after RandomTransaction() has been run.
6563 // Since each iteration of RandomTransaction() incremented a key in each set
6564 // by the same value, the sum of the keys in each set should be the same.
RandomTransactionVerify()6565 void RandomTransactionVerify() {
6566 if (!FLAGS_transaction_db && !FLAGS_optimistic_transaction_db) {
6567 // transactions not used, nothing to verify.
6568 return;
6569 }
6570
6571 Status s =
6572 RandomTransactionInserter::Verify(db_.db,
6573 static_cast<uint16_t>(FLAGS_transaction_sets));
6574
6575 if (s.ok()) {
6576 fprintf(stdout, "RandomTransactionVerify Success.\n");
6577 } else {
6578 fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
6579 }
6580 }
6581 #endif // ROCKSDB_LITE
6582
6583 // Writes and deletes random keys without overwriting keys.
6584 //
6585 // This benchmark is intended to partially replicate the behavior of MyRocks
6586 // secondary indices: All data is stored in keys and updates happen by
6587 // deleting the old version of the key and inserting the new version.
RandomReplaceKeys(ThreadState * thread)6588 void RandomReplaceKeys(ThreadState* thread) {
6589 std::unique_ptr<const char[]> key_guard;
6590 Slice key = AllocateKey(&key_guard);
6591 std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
6592 size_t max_counter = 50;
6593 RandomGenerator gen;
6594
6595 Status s;
6596 DB* db = SelectDB(thread);
6597 for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
6598 GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
6599 s = db->Put(write_options_, key, gen.Generate());
6600 if (!s.ok()) {
6601 fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
6602 exit(1);
6603 }
6604 }
6605
6606 db->GetSnapshot();
6607
6608 std::default_random_engine generator;
6609 std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
6610 FLAGS_stddev);
6611 Duration duration(FLAGS_duration, FLAGS_num);
6612 while (!duration.Done(1)) {
6613 int64_t rnd_id = static_cast<int64_t>(distribution(generator));
6614 int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
6615 static_cast<int64_t>(0));
6616 GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
6617 &key);
6618 s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
6619 : db->Delete(write_options_, key);
6620 if (s.ok()) {
6621 counters[key_id] = (counters[key_id] + 1) % max_counter;
6622 GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
6623 &key);
6624 s = db->Put(write_options_, key, Slice());
6625 }
6626
6627 if (!s.ok()) {
6628 fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
6629 exit(1);
6630 }
6631
6632 thread->stats.FinishedOps(nullptr, db, 1, kOthers);
6633 }
6634
6635 char msg[200];
6636 snprintf(msg, sizeof(msg),
6637 "use single deletes: %d, "
6638 "standard deviation: %lf\n",
6639 FLAGS_use_single_deletes, FLAGS_stddev);
6640 thread->stats.AddMessage(msg);
6641 }
6642
TimeSeriesReadOrDelete(ThreadState * thread,bool do_deletion)6643 void TimeSeriesReadOrDelete(ThreadState* thread, bool do_deletion) {
6644 ReadOptions options(FLAGS_verify_checksum, true);
6645 int64_t read = 0;
6646 int64_t found = 0;
6647 int64_t bytes = 0;
6648
6649 Iterator* iter = nullptr;
6650 // Only work on single database
6651 assert(db_.db != nullptr);
6652 iter = db_.db->NewIterator(options);
6653
6654 std::unique_ptr<const char[]> key_guard;
6655 Slice key = AllocateKey(&key_guard);
6656
6657 char value_buffer[256];
6658 while (true) {
6659 {
6660 MutexLock l(&thread->shared->mu);
6661 if (thread->shared->num_done >= 1) {
6662 // Write thread have finished
6663 break;
6664 }
6665 }
6666 if (!FLAGS_use_tailing_iterator) {
6667 delete iter;
6668 iter = db_.db->NewIterator(options);
6669 }
6670 // Pick a Iterator to use
6671
6672 int64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
6673 GenerateKeyFromInt(key_id, FLAGS_num, &key);
6674 // Reset last 8 bytes to 0
6675 char* start = const_cast<char*>(key.data());
6676 start += key.size() - 8;
6677 memset(start, 0, 8);
6678 ++read;
6679
6680 bool key_found = false;
6681 // Seek the prefix
6682 for (iter->Seek(key); iter->Valid() && iter->key().starts_with(key);
6683 iter->Next()) {
6684 key_found = true;
6685 // Copy out iterator's value to make sure we read them.
6686 if (do_deletion) {
6687 bytes += iter->key().size();
6688 if (KeyExpired(timestamp_emulator_.get(), iter->key())) {
6689 thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
6690 db_.db->Delete(write_options_, iter->key());
6691 } else {
6692 break;
6693 }
6694 } else {
6695 bytes += iter->key().size() + iter->value().size();
6696 thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
6697 Slice value = iter->value();
6698 memcpy(value_buffer, value.data(),
6699 std::min(value.size(), sizeof(value_buffer)));
6700
6701 assert(iter->status().ok());
6702 }
6703 }
6704 found += key_found;
6705
6706 if (thread->shared->read_rate_limiter.get() != nullptr) {
6707 thread->shared->read_rate_limiter->Request(
6708 1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
6709 }
6710 }
6711 delete iter;
6712
6713 char msg[100];
6714 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found,
6715 read);
6716 thread->stats.AddBytes(bytes);
6717 thread->stats.AddMessage(msg);
6718 if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
6719 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
6720 get_perf_context()->ToString());
6721 }
6722 }
6723
TimeSeriesWrite(ThreadState * thread)6724 void TimeSeriesWrite(ThreadState* thread) {
6725 // Special thread that keeps writing until other threads are done.
6726 RandomGenerator gen;
6727 int64_t bytes = 0;
6728
6729 // Don't merge stats from this thread with the readers.
6730 thread->stats.SetExcludeFromMerge();
6731
6732 std::unique_ptr<RateLimiter> write_rate_limiter;
6733 if (FLAGS_benchmark_write_rate_limit > 0) {
6734 write_rate_limiter.reset(
6735 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
6736 }
6737
6738 std::unique_ptr<const char[]> key_guard;
6739 Slice key = AllocateKey(&key_guard);
6740
6741 Duration duration(FLAGS_duration, writes_);
6742 while (!duration.Done(1)) {
6743 DB* db = SelectDB(thread);
6744
6745 uint64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
6746 // Write key id
6747 GenerateKeyFromInt(key_id, FLAGS_num, &key);
6748 // Write timestamp
6749
6750 char* start = const_cast<char*>(key.data());
6751 char* pos = start + 8;
6752 int bytes_to_fill =
6753 std::min(key_size_ - static_cast<int>(pos - start), 8);
6754 uint64_t timestamp_value = timestamp_emulator_->Get();
6755 if (port::kLittleEndian) {
6756 for (int i = 0; i < bytes_to_fill; ++i) {
6757 pos[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
6758 }
6759 } else {
6760 memcpy(pos, static_cast<void*>(×tamp_value), bytes_to_fill);
6761 }
6762
6763 timestamp_emulator_->Inc();
6764
6765 Status s;
6766 Slice val = gen.Generate();
6767 s = db->Put(write_options_, key, val);
6768
6769 if (!s.ok()) {
6770 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
6771 exit(1);
6772 }
6773 bytes = key.size() + val.size();
6774 thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
6775 thread->stats.AddBytes(bytes);
6776
6777 if (FLAGS_benchmark_write_rate_limit > 0) {
6778 write_rate_limiter->Request(
6779 key.size() + val.size(), Env::IO_HIGH,
6780 nullptr /* stats */, RateLimiter::OpType::kWrite);
6781 }
6782 }
6783 }
6784
TimeSeries(ThreadState * thread)6785 void TimeSeries(ThreadState* thread) {
6786 if (thread->tid > 0) {
6787 bool do_deletion = FLAGS_expire_style == "delete" &&
6788 thread->tid <= FLAGS_num_deletion_threads;
6789 TimeSeriesReadOrDelete(thread, do_deletion);
6790 } else {
6791 TimeSeriesWrite(thread);
6792 thread->stats.Stop();
6793 thread->stats.Report("timeseries write");
6794 }
6795 }
6796
Compact(ThreadState * thread)6797 void Compact(ThreadState* thread) {
6798 DB* db = SelectDB(thread);
6799 CompactRangeOptions cro;
6800 cro.bottommost_level_compaction =
6801 BottommostLevelCompaction::kForceOptimized;
6802 db->CompactRange(cro, nullptr, nullptr);
6803 }
6804
CompactAll()6805 void CompactAll() {
6806 if (db_.db != nullptr) {
6807 db_.db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
6808 }
6809 for (const auto& db_with_cfh : multi_dbs_) {
6810 db_with_cfh.db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
6811 }
6812 }
6813
ResetStats()6814 void ResetStats() {
6815 if (db_.db != nullptr) {
6816 db_.db->ResetStats();
6817 }
6818 for (const auto& db_with_cfh : multi_dbs_) {
6819 db_with_cfh.db->ResetStats();
6820 }
6821 }
6822
PrintStatsHistory()6823 void PrintStatsHistory() {
6824 if (db_.db != nullptr) {
6825 PrintStatsHistoryImpl(db_.db, false);
6826 }
6827 for (const auto& db_with_cfh : multi_dbs_) {
6828 PrintStatsHistoryImpl(db_with_cfh.db, true);
6829 }
6830 }
6831
PrintStatsHistoryImpl(DB * db,bool print_header)6832 void PrintStatsHistoryImpl(DB* db, bool print_header) {
6833 if (print_header) {
6834 fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
6835 }
6836
6837 std::unique_ptr<StatsHistoryIterator> shi;
6838 Status s = db->GetStatsHistory(0, port::kMaxUint64, &shi);
6839 if (!s.ok()) {
6840 fprintf(stdout, "%s\n", s.ToString().c_str());
6841 return;
6842 }
6843 assert(shi);
6844 while (shi->Valid()) {
6845 uint64_t stats_time = shi->GetStatsTime();
6846 fprintf(stdout, "------ %s ------\n",
6847 TimeToHumanString(static_cast<int>(stats_time)).c_str());
6848 for (auto& entry : shi->GetStatsMap()) {
6849 fprintf(stdout, " %" PRIu64 " %s %" PRIu64 "\n", stats_time,
6850 entry.first.c_str(), entry.second);
6851 }
6852 shi->Next();
6853 }
6854 }
6855
PrintStats(const char * key)6856 void PrintStats(const char* key) {
6857 if (db_.db != nullptr) {
6858 PrintStats(db_.db, key, false);
6859 }
6860 for (const auto& db_with_cfh : multi_dbs_) {
6861 PrintStats(db_with_cfh.db, key, true);
6862 }
6863 }
6864
PrintStats(DB * db,const char * key,bool print_header=false)6865 void PrintStats(DB* db, const char* key, bool print_header = false) {
6866 if (print_header) {
6867 fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
6868 }
6869 std::string stats;
6870 if (!db->GetProperty(key, &stats)) {
6871 stats = "(failed)";
6872 }
6873 fprintf(stdout, "\n%s\n", stats.c_str());
6874 }
6875
Replay(ThreadState * thread)6876 void Replay(ThreadState* thread) {
6877 if (db_.db != nullptr) {
6878 Replay(thread, &db_);
6879 }
6880 }
6881
Replay(ThreadState *,DBWithColumnFamilies * db_with_cfh)6882 void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) {
6883 Status s;
6884 std::unique_ptr<TraceReader> trace_reader;
6885 s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file,
6886 &trace_reader);
6887 if (!s.ok()) {
6888 fprintf(
6889 stderr,
6890 "Encountered an error creating a TraceReader from the trace file. "
6891 "Error: %s\n",
6892 s.ToString().c_str());
6893 exit(1);
6894 }
6895 Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
6896 std::move(trace_reader));
6897 replayer.SetFastForward(
6898 static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
6899 s = replayer.MultiThreadReplay(
6900 static_cast<uint32_t>(FLAGS_trace_replay_threads));
6901 if (s.ok()) {
6902 fprintf(stdout, "Replay started from trace_file: %s\n",
6903 FLAGS_trace_file.c_str());
6904 } else {
6905 fprintf(stderr, "Starting replay failed. Error: %s\n",
6906 s.ToString().c_str());
6907 }
6908 }
6909 };
6910
db_bench_tool(int argc,char ** argv)6911 int db_bench_tool(int argc, char** argv) {
6912 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
6913 static bool initialized = false;
6914 if (!initialized) {
6915 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
6916 " [OPTIONS]...");
6917 initialized = true;
6918 }
6919 ParseCommandLineFlags(&argc, &argv, true);
6920 FLAGS_compaction_style_e =
6921 (ROCKSDB_NAMESPACE::CompactionStyle)FLAGS_compaction_style;
6922 #ifndef ROCKSDB_LITE
6923 if (FLAGS_statistics && !FLAGS_statistics_string.empty()) {
6924 fprintf(stderr,
6925 "Cannot provide both --statistics and --statistics_string.\n");
6926 exit(1);
6927 }
6928 if (!FLAGS_statistics_string.empty()) {
6929 Status s = ObjectRegistry::NewInstance()->NewSharedObject<Statistics>(
6930 FLAGS_statistics_string, &dbstats);
6931 if (dbstats == nullptr) {
6932 fprintf(stderr,
6933 "No Statistics registered matching string: %s status=%s\n",
6934 FLAGS_statistics_string.c_str(), s.ToString().c_str());
6935 exit(1);
6936 }
6937 }
6938 #endif // ROCKSDB_LITE
6939 if (FLAGS_statistics) {
6940 dbstats = ROCKSDB_NAMESPACE::CreateDBStatistics();
6941 }
6942 if (dbstats) {
6943 dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
6944 }
6945 FLAGS_compaction_pri_e =
6946 (ROCKSDB_NAMESPACE::CompactionPri)FLAGS_compaction_pri;
6947
6948 std::vector<std::string> fanout = ROCKSDB_NAMESPACE::StringSplit(
6949 FLAGS_max_bytes_for_level_multiplier_additional, ',');
6950 for (size_t j = 0; j < fanout.size(); j++) {
6951 FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
6952 #ifndef CYGWIN
6953 std::stoi(fanout[j]));
6954 #else
6955 stoi(fanout[j]));
6956 #endif
6957 }
6958
6959 FLAGS_compression_type_e =
6960 StringToCompressionType(FLAGS_compression_type.c_str());
6961
6962 #ifndef ROCKSDB_LITE
6963 FLAGS_blob_db_compression_type_e =
6964 StringToCompressionType(FLAGS_blob_db_compression_type.c_str());
6965
6966 if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
6967 fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
6968 exit(1);
6969 } else if (!FLAGS_env_uri.empty()) {
6970 Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard);
6971 if (FLAGS_env == nullptr) {
6972 fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
6973 exit(1);
6974 }
6975 }
6976 #endif // ROCKSDB_LITE
6977 if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {
6978 fprintf(stderr,
6979 "`-use_existing_db` must be true for `-use_existing_keys` to be "
6980 "settable\n");
6981 exit(1);
6982 }
6983
6984 if (!FLAGS_hdfs.empty()) {
6985 FLAGS_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs);
6986 }
6987
6988 if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
6989 FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NONE;
6990 else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
6991 FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NORMAL;
6992 else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
6993 FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::SEQUENTIAL;
6994 else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
6995 FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::WILLNEED;
6996 else {
6997 fprintf(stdout, "Unknown compaction fadvice:%s\n",
6998 FLAGS_compaction_fadvice.c_str());
6999 }
7000
7001 FLAGS_value_size_distribution_type_e =
7002 StringToDistributionType(FLAGS_value_size_distribution_type.c_str());
7003
7004 FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
7005
7006 // Note options sanitization may increase thread pool sizes according to
7007 // max_background_flushes/max_background_compactions/max_background_jobs
7008 FLAGS_env->SetBackgroundThreads(FLAGS_num_high_pri_threads,
7009 ROCKSDB_NAMESPACE::Env::Priority::HIGH);
7010 FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
7011 ROCKSDB_NAMESPACE::Env::Priority::BOTTOM);
7012 FLAGS_env->SetBackgroundThreads(FLAGS_num_low_pri_threads,
7013 ROCKSDB_NAMESPACE::Env::Priority::LOW);
7014
7015 // Choose a location for the test database if none given with --db=<path>
7016 if (FLAGS_db.empty()) {
7017 std::string default_db_path;
7018 FLAGS_env->GetTestDirectory(&default_db_path);
7019 default_db_path += "/dbbench";
7020 FLAGS_db = default_db_path;
7021 }
7022
7023 if (FLAGS_stats_interval_seconds > 0) {
7024 // When both are set then FLAGS_stats_interval determines the frequency
7025 // at which the timer is checked for FLAGS_stats_interval_seconds
7026 FLAGS_stats_interval = 1000;
7027 }
7028
7029 if (FLAGS_seek_missing_prefix && FLAGS_prefix_size <= 8) {
7030 fprintf(stderr, "prefix_size > 8 required by --seek_missing_prefix\n");
7031 exit(1);
7032 }
7033
7034 ROCKSDB_NAMESPACE::Benchmark benchmark;
7035 benchmark.Run();
7036
7037 #ifndef ROCKSDB_LITE
7038 if (FLAGS_print_malloc_stats) {
7039 std::string stats_string;
7040 ROCKSDB_NAMESPACE::DumpMallocStats(&stats_string);
7041 fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
7042 }
7043 #endif // ROCKSDB_LITE
7044
7045 return 0;
7046 }
7047 } // namespace ROCKSDB_NAMESPACE
7048 #endif
7049