1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase; 20 21 import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY; 22 23 import java.io.IOException; 24 import java.io.PrintStream; 25 import java.lang.reflect.Constructor; 26 import java.math.BigDecimal; 27 import java.math.MathContext; 28 import java.text.DecimalFormat; 29 import java.text.SimpleDateFormat; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 import java.util.Date; 33 import java.util.LinkedList; 34 import java.util.Map; 35 import java.util.Queue; 36 import java.util.Random; 37 import java.util.TreeMap; 38 import java.util.concurrent.Callable; 39 import java.util.concurrent.ExecutionException; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.Executors; 42 import java.util.concurrent.Future; 43 44 import com.google.common.base.Objects; 45 import com.google.common.util.concurrent.ThreadFactoryBuilder; 46 47 import org.apache.commons.logging.Log; 48 import org.apache.commons.logging.LogFactory; 49 import org.apache.hadoop.conf.Configuration; 50 import org.apache.hadoop.conf.Configured; 51 import org.apache.hadoop.fs.FileSystem; 52 import org.apache.hadoop.fs.Path; 53 import org.apache.hadoop.hbase.classification.InterfaceAudience; 54 import org.apache.hadoop.hbase.client.Admin; 55 import org.apache.hadoop.hbase.client.Append; 56 import org.apache.hadoop.hbase.client.BufferedMutator; 57 import org.apache.hadoop.hbase.client.Connection; 58 import org.apache.hadoop.hbase.client.ConnectionFactory; 59 import org.apache.hadoop.hbase.client.Consistency; 60 import org.apache.hadoop.hbase.client.Delete; 61 import org.apache.hadoop.hbase.client.Durability; 62 import org.apache.hadoop.hbase.client.Get; 63 import org.apache.hadoop.hbase.client.Increment; 64 import org.apache.hadoop.hbase.client.Put; 65 import org.apache.hadoop.hbase.client.Result; 66 import org.apache.hadoop.hbase.client.ResultScanner; 67 import org.apache.hadoop.hbase.client.RowMutations; 68 import org.apache.hadoop.hbase.client.Scan; 69 import org.apache.hadoop.hbase.client.Table; 70 import org.apache.hadoop.hbase.filter.BinaryComparator; 71 import org.apache.hadoop.hbase.filter.CompareFilter; 72 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 73 import org.apache.hadoop.hbase.filter.Filter; 74 import org.apache.hadoop.hbase.filter.FilterAllFilter; 75 import org.apache.hadoop.hbase.filter.FilterList; 76 import org.apache.hadoop.hbase.filter.PageFilter; 77 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 78 import org.apache.hadoop.hbase.filter.WhileMatchFilter; 79 import org.apache.hadoop.hbase.io.compress.Compression; 80 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 81 import org.apache.hadoop.hbase.io.hfile.RandomDistribution; 82 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 83 import org.apache.hadoop.hbase.regionserver.BloomType; 84 import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; 85 import org.apache.hadoop.hbase.trace.SpanReceiverHost; 86 import org.apache.hadoop.hbase.util.*; 87 import org.apache.hadoop.io.LongWritable; 88 import org.apache.hadoop.io.Text; 89 import org.apache.hadoop.mapreduce.Job; 90 import org.apache.hadoop.mapreduce.Mapper; 91 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 92 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 93 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 94 import org.apache.hadoop.util.Tool; 95 import org.apache.hadoop.util.ToolRunner; 96 import org.codehaus.jackson.map.ObjectMapper; 97 98 import com.yammer.metrics.core.Histogram; 99 import com.yammer.metrics.stats.UniformSample; 100 import com.yammer.metrics.stats.Snapshot; 101 102 import org.apache.htrace.Sampler; 103 import org.apache.htrace.Trace; 104 import org.apache.htrace.TraceScope; 105 import org.apache.htrace.impl.ProbabilitySampler; 106 107 /** 108 * Script used evaluating HBase performance and scalability. Runs a HBase 109 * client that steps through one of a set of hardcoded tests or 'experiments' 110 * (e.g. a random reads test, a random writes test, etc.). Pass on the 111 * command-line which test to run and how many clients are participating in 112 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 113 * 114 * <p>This class sets up and runs the evaluation programs described in 115 * Section 7, <i>Performance Evaluation</i>, of the <a 116 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 117 * paper, pages 8-10. 118 * 119 * <p>By default, runs as a mapreduce job where each mapper runs a single test 120 * client. Can also run as a non-mapreduce, multithreaded application by 121 * specifying {@code --nomapred}. Each client does about 1GB of data, unless 122 * specified otherwise. 123 */ 124 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 125 public class PerformanceEvaluation extends Configured implements Tool { 126 private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); 127 private static final ObjectMapper MAPPER = new ObjectMapper(); 128 static { MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true)129 MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true); 130 } 131 132 public static final String TABLE_NAME = "TestTable"; 133 public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); 134 public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0); 135 public static final byte [] QUALIFIER_NAME = COLUMN_ZERO; 136 public static final int DEFAULT_VALUE_LENGTH = 1000; 137 public static final int ROW_LENGTH = 26; 138 139 private static final int ONE_GB = 1024 * 1024 * 1000; 140 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 141 // TODO : should we make this configurable 142 private static final int TAG_LENGTH = 256; 143 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 144 private static final MathContext CXT = MathContext.DECIMAL64; 145 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 146 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 147 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 148 149 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>(); 150 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 151 152 static { addCommandDescriptor(RandomReadTest.class, R, R)153 addCommandDescriptor(RandomReadTest.class, "randomRead", 154 "Run random read test"); addCommandDescriptor(RandomSeekScanTest.class, R, R)155 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 156 "Run random seek and scan 100 test"); addCommandDescriptor(RandomScanWithRange10Test.class, R, R)157 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 158 "Run random seek scan with both start and stop row (max 10 rows)"); addCommandDescriptor(RandomScanWithRange100Test.class, R, R)159 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 160 "Run random seek scan with both start and stop row (max 100 rows)"); addCommandDescriptor(RandomScanWithRange1000Test.class, R, R)161 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 162 "Run random seek scan with both start and stop row (max 1000 rows)"); addCommandDescriptor(RandomScanWithRange10000Test.class, R, R)163 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 164 "Run random seek scan with both start and stop row (max 10000 rows)"); addCommandDescriptor(RandomWriteTest.class, R, R)165 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 166 "Run random write test"); addCommandDescriptor(SequentialReadTest.class, R, R)167 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 168 "Run sequential read test"); addCommandDescriptor(SequentialWriteTest.class, R, R)169 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 170 "Run sequential write test"); addCommandDescriptor(ScanTest.class, R, R)171 addCommandDescriptor(ScanTest.class, "scan", 172 "Run scan test (read every row)"); addCommandDescriptor(FilteredScanTest.class, R, R + R)173 addCommandDescriptor(FilteredScanTest.class, "filterScan", 174 "Run scan test using a filter to find a specific row based on it's value " + 175 "(make sure to use --rows=20)"); addCommandDescriptor(IncrementTest.class, R, R)176 addCommandDescriptor(IncrementTest.class, "increment", 177 "Increment on each row; clients overlap on keyspace so some concurrent operations"); addCommandDescriptor(AppendTest.class, R, R)178 addCommandDescriptor(AppendTest.class, "append", 179 "Append on each row; clients overlap on keyspace so some concurrent operations"); addCommandDescriptor(CheckAndMutateTest.class, R, R)180 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 181 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); addCommandDescriptor(CheckAndPutTest.class, R, R)182 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 183 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); addCommandDescriptor(CheckAndDeleteTest.class, R, R)184 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 185 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 186 } 187 188 /** 189 * Enum for map metrics. Keep it out here rather than inside in the Map 190 * inner-class so we can find associated properties. 191 */ 192 protected static enum Counter { 193 /** elapsed time */ 194 ELAPSED_TIME, 195 /** number of rows */ 196 ROWS 197 } 198 199 protected static class RunResult implements Comparable<RunResult> { RunResult(long duration, Histogram hist)200 public RunResult(long duration, Histogram hist) { 201 this.duration = duration; 202 this.hist = hist; 203 } 204 205 public final long duration; 206 public final Histogram hist; 207 208 @Override toString()209 public String toString() { 210 return Long.toString(duration); 211 } 212 compareTo(RunResult o)213 @Override public int compareTo(RunResult o) { 214 return Long.compare(this.duration, o.duration); 215 } 216 } 217 218 /** 219 * Constructor 220 * @param conf Configuration object 221 */ PerformanceEvaluation(final Configuration conf)222 public PerformanceEvaluation(final Configuration conf) { 223 super(conf); 224 } 225 addCommandDescriptor(Class<? extends Test> cmdClass, String name, String description)226 protected static void addCommandDescriptor(Class<? extends Test> cmdClass, 227 String name, String description) { 228 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 229 COMMANDS.put(name, cmdDescriptor); 230 } 231 232 /** 233 * Implementations can have their status set. 234 */ 235 interface Status { 236 /** 237 * Sets status 238 * @param msg status message 239 * @throws IOException 240 */ setStatus(final String msg)241 void setStatus(final String msg) throws IOException; 242 } 243 244 /** 245 * MapReduce job that runs a performance evaluation client in each map task. 246 */ 247 public static class EvaluationMapTask 248 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 249 250 /** configuration parameter name that contains the command */ 251 public final static String CMD_KEY = "EvaluationMapTask.command"; 252 /** configuration parameter name that contains the PE impl */ 253 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 254 255 private Class<? extends Test> cmd; 256 257 @Override setup(Context context)258 protected void setup(Context context) throws IOException, InterruptedException { 259 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 260 261 // this is required so that extensions of PE are instantiated within the 262 // map reduce task... 263 Class<? extends PerformanceEvaluation> peClass = 264 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 265 try { 266 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 267 } catch (Exception e) { 268 throw new IllegalStateException("Could not instantiate PE instance", e); 269 } 270 } 271 forName(String className, Class<Type> type)272 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 273 try { 274 return Class.forName(className).asSubclass(type); 275 } catch (ClassNotFoundException e) { 276 throw new IllegalStateException("Could not find class for name: " + className, e); 277 } 278 } 279 280 @Override map(LongWritable key, Text value, final Context context)281 protected void map(LongWritable key, Text value, final Context context) 282 throws IOException, InterruptedException { 283 284 Status status = new Status() { 285 @Override 286 public void setStatus(String msg) { 287 context.setStatus(msg); 288 } 289 }; 290 291 ObjectMapper mapper = new ObjectMapper(); 292 TestOptions opts = mapper.readValue(value.toString(), TestOptions.class); 293 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 294 final Connection con = ConnectionFactory.createConnection(conf); 295 296 // Evaluation task 297 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status); 298 // Collect how much time the thing took. Report as map output and 299 // to the ELAPSED_TIME counter. 300 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 301 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 302 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 303 context.progress(); 304 } 305 } 306 307 /* 308 * If table does not already exist, create. Also create a table when 309 * {@code opts.presplitRegions} is specified or when the existing table's 310 * region replica count doesn't match {@code opts.replicas}. 311 */ checkTable(Admin admin, TestOptions opts)312 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 313 TableName tableName = TableName.valueOf(opts.tableName); 314 boolean needsDelete = false, exists = admin.tableExists(tableName); 315 boolean isReadCmd = opts.cmdName.toLowerCase().contains("read") 316 || opts.cmdName.toLowerCase().contains("scan"); 317 if (!exists && isReadCmd) { 318 throw new IllegalStateException( 319 "Must specify an existing table for read commands. Run a write command first."); 320 } 321 HTableDescriptor desc = 322 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 323 byte[][] splits = getSplits(opts); 324 325 // recreate the table when user has requested presplit or when existing 326 // {RegionSplitPolicy,replica count} does not match requested. 327 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 328 || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy) 329 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) { 330 needsDelete = true; 331 // wait, why did it delete my table?!? 332 LOG.debug(Objects.toStringHelper("needsDelete") 333 .add("needsDelete", needsDelete) 334 .add("isReadCmd", isReadCmd) 335 .add("exists", exists) 336 .add("desc", desc) 337 .add("presplit", opts.presplitRegions) 338 .add("splitPolicy", opts.splitPolicy) 339 .add("replicas", opts.replicas)); 340 } 341 342 // remove an existing table 343 if (needsDelete) { 344 if (admin.isTableEnabled(tableName)) { 345 admin.disableTable(tableName); 346 } 347 admin.deleteTable(tableName); 348 } 349 350 // table creation is necessary 351 if (!exists || needsDelete) { 352 desc = getTableDescriptor(opts); 353 if (splits != null) { 354 if (LOG.isDebugEnabled()) { 355 for (int i = 0; i < splits.length; i++) { 356 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 357 } 358 } 359 } 360 admin.createTable(desc, splits); 361 LOG.info("Table " + desc + " created"); 362 } 363 return admin.tableExists(tableName); 364 } 365 366 /** 367 * Create an HTableDescriptor from provided TestOptions. 368 */ getTableDescriptor(TestOptions opts)369 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 370 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 371 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 372 family.setDataBlockEncoding(opts.blockEncoding); 373 family.setCompressionType(opts.compression); 374 family.setBloomFilterType(opts.bloomType); 375 if (opts.inMemoryCF) { 376 family.setInMemory(true); 377 } 378 desc.addFamily(family); 379 if (opts.replicas != DEFAULT_OPTS.replicas) { 380 desc.setRegionReplication(opts.replicas); 381 } 382 if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) { 383 desc.setRegionSplitPolicyClassName(opts.splitPolicy); 384 } 385 return desc; 386 } 387 388 /** 389 * generates splits based on total number of rows and specified split regions 390 */ getSplits(TestOptions opts)391 protected static byte[][] getSplits(TestOptions opts) { 392 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) 393 return null; 394 395 int numSplitPoints = opts.presplitRegions - 1; 396 byte[][] splits = new byte[numSplitPoints][]; 397 int jump = opts.totalRows / opts.presplitRegions; 398 for (int i = 0; i < numSplitPoints; i++) { 399 int rowkey = jump * (1 + i); 400 splits[i] = format(rowkey); 401 } 402 return splits; 403 } 404 405 /* 406 * Run all clients in this vm each to its own thread. 407 */ doLocalClients(final TestOptions opts, final Configuration conf)408 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 409 throws IOException, InterruptedException { 410 final Class<? extends Test> cmd = determineCommandClass(opts.cmdName); 411 assert cmd != null; 412 @SuppressWarnings("unchecked") 413 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 414 RunResult[] results = new RunResult[opts.numClientThreads]; 415 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 416 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 417 final Connection con = ConnectionFactory.createConnection(conf); 418 for (int i = 0; i < threads.length; i++) { 419 final int index = i; 420 threads[i] = pool.submit(new Callable<RunResult>() { 421 @Override 422 public RunResult call() throws Exception { 423 TestOptions threadOpts = new TestOptions(opts); 424 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 425 RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() { 426 @Override 427 public void setStatus(final String msg) throws IOException { 428 LOG.info(msg); 429 } 430 }); 431 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + 432 "ms over " + threadOpts.perClientRunRows + " rows"); 433 return run; 434 } 435 }); 436 } 437 pool.shutdown(); 438 439 for (int i = 0; i < threads.length; i++) { 440 try { 441 results[i] = threads[i].get(); 442 } catch (ExecutionException e) { 443 throw new IOException(e.getCause()); 444 } 445 } 446 final String test = cmd.getSimpleName(); 447 LOG.info("[" + test + "] Summary of timings (ms): " 448 + Arrays.toString(results)); 449 Arrays.sort(results); 450 long total = 0; 451 for (RunResult result : results) { 452 total += result.duration; 453 } 454 LOG.info("[" + test + "]" 455 + "\tMin: " + results[0] + "ms" 456 + "\tMax: " + results[results.length - 1] + "ms" 457 + "\tAvg: " + (total / results.length) + "ms"); 458 459 con.close(); 460 461 return results; 462 } 463 464 /* 465 * Run a mapreduce job. Run as many maps as asked-for clients. 466 * Before we start up the job, write out an input file with instruction 467 * per client regards which row they are to start on. 468 * @param cmd Command to run. 469 * @throws IOException 470 */ doMapReduce(TestOptions opts, final Configuration conf)471 static Job doMapReduce(TestOptions opts, final Configuration conf) 472 throws IOException, InterruptedException, ClassNotFoundException { 473 final Class<? extends Test> cmd = determineCommandClass(opts.cmdName); 474 assert cmd != null; 475 Path inputDir = writeInputFile(conf, opts); 476 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 477 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 478 Job job = Job.getInstance(conf); 479 job.setJarByClass(PerformanceEvaluation.class); 480 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 481 482 job.setInputFormatClass(NLineInputFormat.class); 483 NLineInputFormat.setInputPaths(job, inputDir); 484 // this is default, but be explicit about it just in case. 485 NLineInputFormat.setNumLinesPerSplit(job, 1); 486 487 job.setOutputKeyClass(LongWritable.class); 488 job.setOutputValueClass(LongWritable.class); 489 490 job.setMapperClass(EvaluationMapTask.class); 491 job.setReducerClass(LongSumReducer.class); 492 493 job.setNumReduceTasks(1); 494 495 job.setOutputFormatClass(TextOutputFormat.class); 496 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 497 498 TableMapReduceUtil.addDependencyJars(job); 499 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 500 Histogram.class, // yammer metrics 501 ObjectMapper.class); // jackson-mapper-asl 502 503 TableMapReduceUtil.initCredentials(job); 504 505 job.waitForCompletion(true); 506 return job; 507 } 508 509 /* 510 * Write input file of offsets-per-client for the mapreduce job. 511 * @param c Configuration 512 * @return Directory that contains file written. 513 * @throws IOException 514 */ writeInputFile(final Configuration c, final TestOptions opts)515 private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 516 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 517 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 518 Path inputDir = new Path(jobdir, "inputs"); 519 520 FileSystem fs = FileSystem.get(c); 521 fs.mkdirs(inputDir); 522 523 Path inputFile = new Path(inputDir, "input.txt"); 524 PrintStream out = new PrintStream(fs.create(inputFile)); 525 // Make input random. 526 Map<Integer, String> m = new TreeMap<Integer, String>(); 527 Hash h = MurmurHash.getInstance(); 528 int perClientRows = (opts.totalRows / opts.numClientThreads); 529 try { 530 for (int i = 0; i < 10; i++) { 531 for (int j = 0; j < opts.numClientThreads; j++) { 532 TestOptions next = new TestOptions(opts); 533 next.startRow = (j * perClientRows) + (i * (perClientRows/10)); 534 next.perClientRunRows = perClientRows / 10; 535 String s = MAPPER.writeValueAsString(next); 536 LOG.info("maptask input=" + s); 537 int hash = h.hash(Bytes.toBytes(s)); 538 m.put(hash, s); 539 } 540 } 541 for (Map.Entry<Integer, String> e: m.entrySet()) { 542 out.println(e.getValue()); 543 } 544 } finally { 545 out.close(); 546 } 547 return inputDir; 548 } 549 550 /** 551 * Describes a command. 552 */ 553 static class CmdDescriptor { 554 private Class<? extends Test> cmdClass; 555 private String name; 556 private String description; 557 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description)558 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 559 this.cmdClass = cmdClass; 560 this.name = name; 561 this.description = description; 562 } 563 getCmdClass()564 public Class<? extends Test> getCmdClass() { 565 return cmdClass; 566 } 567 getName()568 public String getName() { 569 return name; 570 } 571 getDescription()572 public String getDescription() { 573 return description; 574 } 575 } 576 577 /** 578 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. 579 * This makes tracking all these arguments a little easier. 580 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON 581 * serialization of this TestOptions class behave), and you need to add to the clone constructor 582 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below. 583 */ 584 static class TestOptions { 585 String cmdName = null; 586 boolean nomapred = false; 587 boolean filterAll = false; 588 int startRow = 0; 589 float size = 1.0f; 590 int perClientRunRows = DEFAULT_ROWS_PER_GB; 591 int numClientThreads = 1; 592 int totalRows = DEFAULT_ROWS_PER_GB; 593 float sampleRate = 1.0f; 594 double traceRate = 0.0; 595 String tableName = TABLE_NAME; 596 boolean flushCommits = true; 597 boolean writeToWAL = true; 598 boolean autoFlush = false; 599 boolean oneCon = false; 600 boolean useTags = false; 601 int noOfTags = 1; 602 boolean reportLatency = false; 603 int multiGet = 0; 604 int randomSleep = 0; 605 boolean inMemoryCF = false; 606 int presplitRegions = 0; 607 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 608 String splitPolicy = null; 609 Compression.Algorithm compression = Compression.Algorithm.NONE; 610 BloomType bloomType = BloomType.ROW; 611 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 612 boolean valueRandom = false; 613 boolean valueZipf = false; 614 int valueSize = DEFAULT_VALUE_LENGTH; 615 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; 616 int columns = 1; 617 int caching = 30; 618 boolean addColumns = true; 619 TestOptions()620 public TestOptions() {} 621 622 /** 623 * Clone constructor. 624 * @param that Object to copy from. 625 */ TestOptions(TestOptions that)626 public TestOptions(TestOptions that) { 627 this.cmdName = that.cmdName; 628 this.nomapred = that.nomapred; 629 this.startRow = that.startRow; 630 this.size = that.size; 631 this.perClientRunRows = that.perClientRunRows; 632 this.numClientThreads = that.numClientThreads; 633 this.totalRows = that.totalRows; 634 this.sampleRate = that.sampleRate; 635 this.traceRate = that.traceRate; 636 this.tableName = that.tableName; 637 this.flushCommits = that.flushCommits; 638 this.writeToWAL = that.writeToWAL; 639 this.autoFlush = that.autoFlush; 640 this.oneCon = that.oneCon; 641 this.useTags = that.useTags; 642 this.noOfTags = that.noOfTags; 643 this.reportLatency = that.reportLatency; 644 this.multiGet = that.multiGet; 645 this.inMemoryCF = that.inMemoryCF; 646 this.presplitRegions = that.presplitRegions; 647 this.replicas = that.replicas; 648 this.splitPolicy = that.splitPolicy; 649 this.compression = that.compression; 650 this.blockEncoding = that.blockEncoding; 651 this.filterAll = that.filterAll; 652 this.bloomType = that.bloomType; 653 this.valueRandom = that.valueRandom; 654 this.valueZipf = that.valueZipf; 655 this.valueSize = that.valueSize; 656 this.period = that.period; 657 this.randomSleep = that.randomSleep; 658 this.addColumns = that.addColumns; 659 this.columns = that.columns; 660 this.caching = that.caching; 661 } 662 getCaching()663 public int getCaching() { 664 return this.caching; 665 } 666 setCaching(final int caching)667 public void setCaching(final int caching) { 668 this.caching = caching; 669 } 670 getColumns()671 public int getColumns() { 672 return this.columns; 673 } 674 setColumns(final int columns)675 public void setColumns(final int columns) { 676 this.columns = columns; 677 } 678 isValueZipf()679 public boolean isValueZipf() { 680 return valueZipf; 681 } 682 setValueZipf(boolean valueZipf)683 public void setValueZipf(boolean valueZipf) { 684 this.valueZipf = valueZipf; 685 } 686 getCmdName()687 public String getCmdName() { 688 return cmdName; 689 } 690 setCmdName(String cmdName)691 public void setCmdName(String cmdName) { 692 this.cmdName = cmdName; 693 } 694 getRandomSleep()695 public int getRandomSleep() { 696 return randomSleep; 697 } 698 setRandomSleep(int randomSleep)699 public void setRandomSleep(int randomSleep) { 700 this.randomSleep = randomSleep; 701 } 702 getReplicas()703 public int getReplicas() { 704 return replicas; 705 } 706 setReplicas(int replicas)707 public void setReplicas(int replicas) { 708 this.replicas = replicas; 709 } 710 getSplitPolicy()711 public String getSplitPolicy() { 712 return splitPolicy; 713 } 714 setSplitPolicy(String splitPolicy)715 public void setSplitPolicy(String splitPolicy) { 716 this.splitPolicy = splitPolicy; 717 } 718 setNomapred(boolean nomapred)719 public void setNomapred(boolean nomapred) { 720 this.nomapred = nomapred; 721 } 722 setFilterAll(boolean filterAll)723 public void setFilterAll(boolean filterAll) { 724 this.filterAll = filterAll; 725 } 726 setStartRow(int startRow)727 public void setStartRow(int startRow) { 728 this.startRow = startRow; 729 } 730 setSize(float size)731 public void setSize(float size) { 732 this.size = size; 733 } 734 setPerClientRunRows(int perClientRunRows)735 public void setPerClientRunRows(int perClientRunRows) { 736 this.perClientRunRows = perClientRunRows; 737 } 738 setNumClientThreads(int numClientThreads)739 public void setNumClientThreads(int numClientThreads) { 740 this.numClientThreads = numClientThreads; 741 } 742 setTotalRows(int totalRows)743 public void setTotalRows(int totalRows) { 744 this.totalRows = totalRows; 745 } 746 setSampleRate(float sampleRate)747 public void setSampleRate(float sampleRate) { 748 this.sampleRate = sampleRate; 749 } 750 setTraceRate(double traceRate)751 public void setTraceRate(double traceRate) { 752 this.traceRate = traceRate; 753 } 754 setTableName(String tableName)755 public void setTableName(String tableName) { 756 this.tableName = tableName; 757 } 758 setFlushCommits(boolean flushCommits)759 public void setFlushCommits(boolean flushCommits) { 760 this.flushCommits = flushCommits; 761 } 762 setWriteToWAL(boolean writeToWAL)763 public void setWriteToWAL(boolean writeToWAL) { 764 this.writeToWAL = writeToWAL; 765 } 766 setAutoFlush(boolean autoFlush)767 public void setAutoFlush(boolean autoFlush) { 768 this.autoFlush = autoFlush; 769 } 770 setOneCon(boolean oneCon)771 public void setOneCon(boolean oneCon) { 772 this.oneCon = oneCon; 773 } 774 setUseTags(boolean useTags)775 public void setUseTags(boolean useTags) { 776 this.useTags = useTags; 777 } 778 setNoOfTags(int noOfTags)779 public void setNoOfTags(int noOfTags) { 780 this.noOfTags = noOfTags; 781 } 782 setReportLatency(boolean reportLatency)783 public void setReportLatency(boolean reportLatency) { 784 this.reportLatency = reportLatency; 785 } 786 setMultiGet(int multiGet)787 public void setMultiGet(int multiGet) { 788 this.multiGet = multiGet; 789 } 790 setInMemoryCF(boolean inMemoryCF)791 public void setInMemoryCF(boolean inMemoryCF) { 792 this.inMemoryCF = inMemoryCF; 793 } 794 setPresplitRegions(int presplitRegions)795 public void setPresplitRegions(int presplitRegions) { 796 this.presplitRegions = presplitRegions; 797 } 798 setCompression(Compression.Algorithm compression)799 public void setCompression(Compression.Algorithm compression) { 800 this.compression = compression; 801 } 802 setBloomType(BloomType bloomType)803 public void setBloomType(BloomType bloomType) { 804 this.bloomType = bloomType; 805 } 806 setBlockEncoding(DataBlockEncoding blockEncoding)807 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 808 this.blockEncoding = blockEncoding; 809 } 810 setValueRandom(boolean valueRandom)811 public void setValueRandom(boolean valueRandom) { 812 this.valueRandom = valueRandom; 813 } 814 setValueSize(int valueSize)815 public void setValueSize(int valueSize) { 816 this.valueSize = valueSize; 817 } 818 setPeriod(int period)819 public void setPeriod(int period) { 820 this.period = period; 821 } 822 isNomapred()823 public boolean isNomapred() { 824 return nomapred; 825 } 826 isFilterAll()827 public boolean isFilterAll() { 828 return filterAll; 829 } 830 getStartRow()831 public int getStartRow() { 832 return startRow; 833 } 834 getSize()835 public float getSize() { 836 return size; 837 } 838 getPerClientRunRows()839 public int getPerClientRunRows() { 840 return perClientRunRows; 841 } 842 getNumClientThreads()843 public int getNumClientThreads() { 844 return numClientThreads; 845 } 846 getTotalRows()847 public int getTotalRows() { 848 return totalRows; 849 } 850 getSampleRate()851 public float getSampleRate() { 852 return sampleRate; 853 } 854 getTraceRate()855 public double getTraceRate() { 856 return traceRate; 857 } 858 getTableName()859 public String getTableName() { 860 return tableName; 861 } 862 isFlushCommits()863 public boolean isFlushCommits() { 864 return flushCommits; 865 } 866 isWriteToWAL()867 public boolean isWriteToWAL() { 868 return writeToWAL; 869 } 870 isAutoFlush()871 public boolean isAutoFlush() { 872 return autoFlush; 873 } 874 isUseTags()875 public boolean isUseTags() { 876 return useTags; 877 } 878 getNoOfTags()879 public int getNoOfTags() { 880 return noOfTags; 881 } 882 isReportLatency()883 public boolean isReportLatency() { 884 return reportLatency; 885 } 886 getMultiGet()887 public int getMultiGet() { 888 return multiGet; 889 } 890 isInMemoryCF()891 public boolean isInMemoryCF() { 892 return inMemoryCF; 893 } 894 getPresplitRegions()895 public int getPresplitRegions() { 896 return presplitRegions; 897 } 898 getCompression()899 public Compression.Algorithm getCompression() { 900 return compression; 901 } 902 getBlockEncoding()903 public DataBlockEncoding getBlockEncoding() { 904 return blockEncoding; 905 } 906 isValueRandom()907 public boolean isValueRandom() { 908 return valueRandom; 909 } 910 getValueSize()911 public int getValueSize() { 912 return valueSize; 913 } 914 getPeriod()915 public int getPeriod() { 916 return period; 917 } 918 getBloomType()919 public BloomType getBloomType() { 920 return bloomType; 921 } 922 isOneCon()923 public boolean isOneCon() { 924 return oneCon; 925 } 926 getAddColumns()927 public boolean getAddColumns() { 928 return addColumns; 929 } 930 setAddColumns(boolean addColumns)931 public void setAddColumns(boolean addColumns) { 932 this.addColumns = addColumns; 933 } 934 } 935 936 /* 937 * A test. 938 * Subclass to particularize what happens per row. 939 */ 940 static abstract class Test { 941 // Below is make it so when Tests are all running in the one 942 // jvm, that they each have a differently seeded Random. 943 private static final Random randomSeed = new Random(System.currentTimeMillis()); 944 nextRandomSeed()945 private static long nextRandomSeed() { 946 return randomSeed.nextLong(); 947 } 948 private final int everyN; 949 950 protected final Random rand = new Random(nextRandomSeed()); 951 protected final Configuration conf; 952 protected final TestOptions opts; 953 954 private final Status status; 955 private final Sampler<?> traceSampler; 956 private final SpanReceiverHost receiverHost; 957 protected Connection connection; 958 959 private String testName; 960 private Histogram latency; 961 private Histogram valueSize; 962 private RandomDistribution.Zipf zipf; 963 964 /** 965 * Note that all subclasses of this class must provide a public constructor 966 * that has the exact same list of arguments. 967 */ Test(final Connection con, final TestOptions options, final Status status)968 Test(final Connection con, final TestOptions options, final Status status) { 969 this.connection = con; 970 this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration(); 971 this.opts = options; 972 this.status = status; 973 this.testName = this.getClass().getSimpleName(); 974 receiverHost = SpanReceiverHost.getInstance(conf); 975 if (options.traceRate >= 1.0) { 976 this.traceSampler = Sampler.ALWAYS; 977 } else if (options.traceRate > 0.0) { 978 conf.setDouble("hbase.sampler.fraction", options.traceRate); 979 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); 980 } else { 981 this.traceSampler = Sampler.NEVER; 982 } 983 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 984 if (options.isValueZipf()) { 985 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1); 986 } 987 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 988 } 989 getValueLength(final Random r)990 int getValueLength(final Random r) { 991 if (this.opts.isValueRandom()) return Math.abs(r.nextInt() % opts.valueSize); 992 else if (this.opts.isValueZipf()) return Math.abs(this.zipf.nextInt()); 993 else return opts.valueSize; 994 } 995 updateValueSize(final Result [] rs)996 void updateValueSize(final Result [] rs) throws IOException { 997 if (rs == null || !isRandomValueSize()) return; 998 for (Result r: rs) updateValueSize(r); 999 } 1000 updateValueSize(final Result r)1001 void updateValueSize(final Result r) throws IOException { 1002 if (r == null || !isRandomValueSize()) return; 1003 int size = 0; 1004 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1005 size += scanner.current().getValueLength(); 1006 } 1007 updateValueSize(size); 1008 } 1009 updateValueSize(final int valueSize)1010 void updateValueSize(final int valueSize) { 1011 if (!isRandomValueSize()) return; 1012 this.valueSize.update(valueSize); 1013 } 1014 generateStatus(final int sr, final int i, final int lr)1015 String generateStatus(final int sr, final int i, final int lr) { 1016 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() + 1017 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport()); 1018 } 1019 isRandomValueSize()1020 boolean isRandomValueSize() { 1021 return opts.valueRandom; 1022 } 1023 getReportingPeriod()1024 protected int getReportingPeriod() { 1025 return opts.period; 1026 } 1027 1028 /** 1029 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1030 */ getLatency()1031 public Histogram getLatency() { 1032 return latency; 1033 } 1034 testSetup()1035 void testSetup() throws IOException { 1036 if (!opts.oneCon) { 1037 this.connection = ConnectionFactory.createConnection(conf); 1038 } 1039 onStartup(); 1040 latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); 1041 valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); 1042 } 1043 onStartup()1044 abstract void onStartup() throws IOException; 1045 testTakedown()1046 void testTakedown() throws IOException { 1047 reportLatency(); 1048 reportValueSize(); 1049 onTakedown(); 1050 if (!opts.oneCon) { 1051 connection.close(); 1052 } 1053 receiverHost.closeReceivers(); 1054 } 1055 onTakedown()1056 abstract void onTakedown() throws IOException; 1057 1058 /* 1059 * Run test 1060 * @return Elapsed time. 1061 * @throws IOException 1062 */ test()1063 long test() throws IOException, InterruptedException { 1064 testSetup(); 1065 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1066 final long startTime = System.nanoTime(); 1067 try { 1068 testTimed(); 1069 } finally { 1070 testTakedown(); 1071 } 1072 return (System.nanoTime() - startTime) / 1000000; 1073 } 1074 getStartRow()1075 int getStartRow() { 1076 return opts.startRow; 1077 } 1078 getLastRow()1079 int getLastRow() { 1080 return getStartRow() + opts.perClientRunRows; 1081 } 1082 1083 /** 1084 * Provides an extension point for tests that don't want a per row invocation. 1085 */ testTimed()1086 void testTimed() throws IOException, InterruptedException { 1087 int startRow = getStartRow(); 1088 int lastRow = getLastRow(); 1089 // Report on completion of 1/10th of total. 1090 for (int i = startRow; i < lastRow; i++) { 1091 if (i % everyN != 0) continue; 1092 long startTime = System.nanoTime(); 1093 TraceScope scope = Trace.startSpan("test row", traceSampler); 1094 try { 1095 testRow(i); 1096 } finally { 1097 scope.close(); 1098 } 1099 latency.update((System.nanoTime() - startTime) / 1000); 1100 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1101 status.setStatus(generateStatus(startRow, i, lastRow)); 1102 } 1103 } 1104 } 1105 1106 /** 1107 * report percentiles of latency 1108 * @throws IOException 1109 */ reportLatency()1110 private void reportLatency() throws IOException { 1111 status.setStatus(testName + " latency log (microseconds), on " + 1112 latency.count() + " measures"); 1113 reportHistogram(this.latency); 1114 } 1115 reportValueSize()1116 private void reportValueSize() throws IOException { 1117 status.setStatus(testName + " valueSize after " + 1118 valueSize.count() + " measures"); 1119 reportHistogram(this.valueSize); 1120 } 1121 reportHistogram(final Histogram h)1122 private void reportHistogram(final Histogram h) throws IOException { 1123 Snapshot sn = h.getSnapshot(); 1124 status.setStatus(testName + " Min = " + h.min()); 1125 status.setStatus(testName + " Avg = " + h.mean()); 1126 status.setStatus(testName + " StdDev = " + h.stdDev()); 1127 status.setStatus(testName + " 50th = " + sn.getMedian()); 1128 status.setStatus(testName + " 75th = " + sn.get75thPercentile()); 1129 status.setStatus(testName + " 95th = " + sn.get95thPercentile()); 1130 status.setStatus(testName + " 99th = " + sn.get99thPercentile()); 1131 status.setStatus(testName + " 99.9th = " + sn.get999thPercentile()); 1132 status.setStatus(testName + " 99.99th = " + sn.getValue(0.9999)); 1133 status.setStatus(testName + " 99.999th = " + sn.getValue(0.99999)); 1134 status.setStatus(testName + " Max = " + h.max()); 1135 } 1136 1137 /** 1138 * @return Subset of the histograms' calculation. 1139 */ getShortLatencyReport()1140 public String getShortLatencyReport() { 1141 return YammerHistogramUtils.getShortHistogramReport(this.latency); 1142 } 1143 1144 /** 1145 * @return Subset of the histograms' calculation. 1146 */ getShortValueSizeReport()1147 public String getShortValueSizeReport() { 1148 return YammerHistogramUtils.getShortHistogramReport(this.valueSize); 1149 } 1150 1151 /* 1152 * Test for individual row. 1153 * @param i Row index. 1154 */ testRow(final int i)1155 abstract void testRow(final int i) throws IOException, InterruptedException; 1156 } 1157 1158 static abstract class TableTest extends Test { 1159 protected Table table; 1160 TableTest(Connection con, TestOptions options, Status status)1161 TableTest(Connection con, TestOptions options, Status status) { 1162 super(con, options, status); 1163 } 1164 1165 @Override onStartup()1166 void onStartup() throws IOException { 1167 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1168 } 1169 1170 @Override onTakedown()1171 void onTakedown() throws IOException { 1172 table.close(); 1173 } 1174 } 1175 1176 static abstract class BufferedMutatorTest extends Test { 1177 protected BufferedMutator mutator; 1178 BufferedMutatorTest(Connection con, TestOptions options, Status status)1179 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1180 super(con, options, status); 1181 } 1182 1183 @Override onStartup()1184 void onStartup() throws IOException { 1185 this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); 1186 } 1187 1188 @Override onTakedown()1189 void onTakedown() throws IOException { 1190 mutator.close(); 1191 } 1192 } 1193 1194 static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Connection con, TestOptions options, Status status)1195 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1196 super(con, options, status); 1197 } 1198 1199 @Override testRow(final int i)1200 void testRow(final int i) throws IOException { 1201 Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows)); 1202 scan.setCaching(opts.caching); 1203 FilterList list = new FilterList(); 1204 if (opts.addColumns) { 1205 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1206 } else { 1207 scan.addFamily(FAMILY_NAME); 1208 } 1209 if (opts.filterAll) { 1210 list.addFilter(new FilterAllFilter()); 1211 } 1212 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1213 scan.setFilter(list); 1214 ResultScanner s = this.table.getScanner(scan); 1215 for (Result rr; (rr = s.next()) != null;) { 1216 updateValueSize(rr); 1217 } 1218 s.close(); 1219 } 1220 1221 @Override getReportingPeriod()1222 protected int getReportingPeriod() { 1223 int period = opts.perClientRunRows / 100; 1224 return period == 0 ? opts.perClientRunRows : period; 1225 } 1226 1227 } 1228 1229 static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Connection con, TestOptions options, Status status)1230 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1231 super(con, options, status); 1232 } 1233 1234 @Override testRow(final int i)1235 void testRow(final int i) throws IOException { 1236 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1237 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); 1238 scan.setCaching(opts.caching); 1239 if (opts.filterAll) { 1240 scan.setFilter(new FilterAllFilter()); 1241 } 1242 if (opts.addColumns) { 1243 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1244 } else { 1245 scan.addFamily(FAMILY_NAME); 1246 } 1247 Result r = null; 1248 int count = 0; 1249 ResultScanner s = this.table.getScanner(scan); 1250 for (; (r = s.next()) != null;) { 1251 updateValueSize(r); 1252 count++; 1253 } 1254 if (i % 100 == 0) { 1255 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1256 Bytes.toString(startAndStopRow.getFirst()), 1257 Bytes.toString(startAndStopRow.getSecond()), count)); 1258 } 1259 1260 s.close(); 1261 } 1262 getStartAndStopRow()1263 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1264 generateStartAndStopRows(int maxRange)1265 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1266 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1267 int stop = start + maxRange; 1268 return new Pair<byte[],byte[]>(format(start), format(stop)); 1269 } 1270 1271 @Override getReportingPeriod()1272 protected int getReportingPeriod() { 1273 int period = opts.perClientRunRows / 100; 1274 return period == 0? opts.perClientRunRows: period; 1275 } 1276 } 1277 1278 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { RandomScanWithRange10Test(Connection con, TestOptions options, Status status)1279 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1280 super(con, options, status); 1281 } 1282 1283 @Override getStartAndStopRow()1284 protected Pair<byte[], byte[]> getStartAndStopRow() { 1285 return generateStartAndStopRows(10); 1286 } 1287 } 1288 1289 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { RandomScanWithRange100Test(Connection con, TestOptions options, Status status)1290 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1291 super(con, options, status); 1292 } 1293 1294 @Override getStartAndStopRow()1295 protected Pair<byte[], byte[]> getStartAndStopRow() { 1296 return generateStartAndStopRows(100); 1297 } 1298 } 1299 1300 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { RandomScanWithRange1000Test(Connection con, TestOptions options, Status status)1301 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1302 super(con, options, status); 1303 } 1304 1305 @Override getStartAndStopRow()1306 protected Pair<byte[], byte[]> getStartAndStopRow() { 1307 return generateStartAndStopRows(1000); 1308 } 1309 } 1310 1311 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { RandomScanWithRange10000Test(Connection con, TestOptions options, Status status)1312 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1313 super(con, options, status); 1314 } 1315 1316 @Override getStartAndStopRow()1317 protected Pair<byte[], byte[]> getStartAndStopRow() { 1318 return generateStartAndStopRows(10000); 1319 } 1320 } 1321 1322 static class RandomReadTest extends TableTest { 1323 private final Consistency consistency; 1324 private ArrayList<Get> gets; 1325 private Random rd = new Random(); 1326 RandomReadTest(Connection con, TestOptions options, Status status)1327 RandomReadTest(Connection con, TestOptions options, Status status) { 1328 super(con, options, status); 1329 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1330 if (opts.multiGet > 0) { 1331 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1332 this.gets = new ArrayList<Get>(opts.multiGet); 1333 } 1334 } 1335 1336 @Override testRow(final int i)1337 void testRow(final int i) throws IOException, InterruptedException { 1338 if (opts.randomSleep > 0) { 1339 Thread.sleep(rd.nextInt(opts.randomSleep)); 1340 } 1341 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1342 if (opts.addColumns) { 1343 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1344 } else { 1345 get.addFamily(FAMILY_NAME); 1346 } 1347 if (opts.filterAll) { 1348 get.setFilter(new FilterAllFilter()); 1349 } 1350 get.setConsistency(consistency); 1351 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1352 if (opts.multiGet > 0) { 1353 this.gets.add(get); 1354 if (this.gets.size() == opts.multiGet) { 1355 Result [] rs = this.table.get(this.gets); 1356 updateValueSize(rs); 1357 this.gets.clear(); 1358 } 1359 } else { 1360 updateValueSize(this.table.get(get)); 1361 } 1362 } 1363 1364 @Override getReportingPeriod()1365 protected int getReportingPeriod() { 1366 int period = opts.perClientRunRows / 10; 1367 return period == 0 ? opts.perClientRunRows : period; 1368 } 1369 1370 @Override testTakedown()1371 protected void testTakedown() throws IOException { 1372 if (this.gets != null && this.gets.size() > 0) { 1373 this.table.get(gets); 1374 this.gets.clear(); 1375 } 1376 super.testTakedown(); 1377 } 1378 } 1379 1380 static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Connection con, TestOptions options, Status status)1381 RandomWriteTest(Connection con, TestOptions options, Status status) { 1382 super(con, options, status); 1383 } 1384 1385 @Override testRow(final int i)1386 void testRow(final int i) throws IOException { 1387 byte[] row = getRandomRow(this.rand, opts.totalRows); 1388 Put put = new Put(row); 1389 for (int column = 0; column < opts.columns; column++) { 1390 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1391 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1392 if (opts.useTags) { 1393 byte[] tag = generateData(this.rand, TAG_LENGTH); 1394 Tag[] tags = new Tag[opts.noOfTags]; 1395 for (int n = 0; n < opts.noOfTags; n++) { 1396 Tag t = new Tag((byte) n, tag); 1397 tags[n] = t; 1398 } 1399 KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, 1400 value, tags); 1401 put.add(kv); 1402 updateValueSize(kv.getValueLength()); 1403 } else { 1404 put.add(FAMILY_NAME, qualifier, value); 1405 updateValueSize(value.length); 1406 } 1407 } 1408 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1409 mutator.mutate(put); 1410 } 1411 } 1412 1413 static class ScanTest extends TableTest { 1414 private ResultScanner testScanner; 1415 ScanTest(Connection con, TestOptions options, Status status)1416 ScanTest(Connection con, TestOptions options, Status status) { 1417 super(con, options, status); 1418 } 1419 1420 @Override testTakedown()1421 void testTakedown() throws IOException { 1422 if (this.testScanner != null) { 1423 this.testScanner.close(); 1424 } 1425 super.testTakedown(); 1426 } 1427 1428 1429 @Override testRow(final int i)1430 void testRow(final int i) throws IOException { 1431 if (this.testScanner == null) { 1432 Scan scan = new Scan(format(opts.startRow)); 1433 scan.setCaching(opts.caching); 1434 if (opts.addColumns) { 1435 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1436 } else { 1437 scan.addFamily(FAMILY_NAME); 1438 } 1439 if (opts.filterAll) { 1440 scan.setFilter(new FilterAllFilter()); 1441 } 1442 this.testScanner = table.getScanner(scan); 1443 } 1444 Result r = testScanner.next(); 1445 updateValueSize(r); 1446 } 1447 } 1448 1449 /** 1450 * Base class for operations that are CAS-like; that read a value and then set it based off what 1451 * they read. In this category is increment, append, checkAndPut, etc. 1452 * 1453 * <p>These operations also want some concurrency going on. Usually when these tests run, they 1454 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 1455 * same key space. We do this with our getStartRow and getLastRow overrides. 1456 */ 1457 static abstract class CASTableTest extends TableTest { 1458 private final byte [] qualifier; CASTableTest(Connection con, TestOptions options, Status status)1459 CASTableTest(Connection con, TestOptions options, Status status) { 1460 super(con, options, status); 1461 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 1462 } 1463 getQualifier()1464 byte [] getQualifier() { 1465 return this.qualifier; 1466 } 1467 1468 @Override getStartRow()1469 int getStartRow() { 1470 return 0; 1471 } 1472 1473 @Override getLastRow()1474 int getLastRow() { 1475 return opts.perClientRunRows; 1476 } 1477 } 1478 1479 static class IncrementTest extends CASTableTest { IncrementTest(Connection con, TestOptions options, Status status)1480 IncrementTest(Connection con, TestOptions options, Status status) { 1481 super(con, options, status); 1482 } 1483 1484 @Override testRow(final int i)1485 void testRow(final int i) throws IOException { 1486 Increment increment = new Increment(format(i)); 1487 increment.addColumn(FAMILY_NAME, getQualifier(), 1l); 1488 updateValueSize(this.table.increment(increment)); 1489 } 1490 } 1491 1492 static class AppendTest extends CASTableTest { AppendTest(Connection con, TestOptions options, Status status)1493 AppendTest(Connection con, TestOptions options, Status status) { 1494 super(con, options, status); 1495 } 1496 1497 @Override testRow(final int i)1498 void testRow(final int i) throws IOException { 1499 byte [] bytes = format(i); 1500 Append append = new Append(bytes); 1501 append.add(FAMILY_NAME, getQualifier(), bytes); 1502 updateValueSize(this.table.append(append)); 1503 } 1504 } 1505 1506 static class CheckAndMutateTest extends CASTableTest { CheckAndMutateTest(Connection con, TestOptions options, Status status)1507 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 1508 super(con, options, status); 1509 } 1510 1511 @Override testRow(final int i)1512 void testRow(final int i) throws IOException { 1513 byte [] bytes = format(i); 1514 // Put a known value so when we go to check it, it is there. 1515 Put put = new Put(bytes); 1516 put.addColumn(FAMILY_NAME, getQualifier(), bytes); 1517 this.table.put(put); 1518 RowMutations mutations = new RowMutations(bytes); 1519 mutations.add(put); 1520 this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, 1521 mutations); 1522 } 1523 } 1524 1525 static class CheckAndPutTest extends CASTableTest { CheckAndPutTest(Connection con, TestOptions options, Status status)1526 CheckAndPutTest(Connection con, TestOptions options, Status status) { 1527 super(con, options, status); 1528 } 1529 1530 @Override testRow(final int i)1531 void testRow(final int i) throws IOException { 1532 byte [] bytes = format(i); 1533 // Put a known value so when we go to check it, it is there. 1534 Put put = new Put(bytes); 1535 put.addColumn(FAMILY_NAME, getQualifier(), bytes); 1536 this.table.put(put); 1537 this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put); 1538 } 1539 } 1540 1541 static class CheckAndDeleteTest extends CASTableTest { CheckAndDeleteTest(Connection con, TestOptions options, Status status)1542 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 1543 super(con, options, status); 1544 } 1545 1546 @Override testRow(final int i)1547 void testRow(final int i) throws IOException { 1548 byte [] bytes = format(i); 1549 // Put a known value so when we go to check it, it is there. 1550 Put put = new Put(bytes); 1551 put.addColumn(FAMILY_NAME, getQualifier(), bytes); 1552 this.table.put(put); 1553 Delete delete = new Delete(put.getRow()); 1554 delete.addColumn(FAMILY_NAME, getQualifier()); 1555 this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete); 1556 } 1557 } 1558 1559 static class SequentialReadTest extends TableTest { SequentialReadTest(Connection con, TestOptions options, Status status)1560 SequentialReadTest(Connection con, TestOptions options, Status status) { 1561 super(con, options, status); 1562 } 1563 1564 @Override testRow(final int i)1565 void testRow(final int i) throws IOException { 1566 Get get = new Get(format(i)); 1567 if (opts.addColumns) { 1568 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1569 } 1570 if (opts.filterAll) { 1571 get.setFilter(new FilterAllFilter()); 1572 } 1573 updateValueSize(table.get(get)); 1574 } 1575 } 1576 1577 static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Connection con, TestOptions options, Status status)1578 SequentialWriteTest(Connection con, TestOptions options, Status status) { 1579 super(con, options, status); 1580 } 1581 1582 @Override testRow(final int i)1583 void testRow(final int i) throws IOException { 1584 byte[] row = format(i); 1585 Put put = new Put(row); 1586 for (int column = 0; column < opts.columns; column++) { 1587 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1588 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1589 if (opts.useTags) { 1590 byte[] tag = generateData(this.rand, TAG_LENGTH); 1591 Tag[] tags = new Tag[opts.noOfTags]; 1592 for (int n = 0; n < opts.noOfTags; n++) { 1593 Tag t = new Tag((byte) n, tag); 1594 tags[n] = t; 1595 } 1596 KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, 1597 value, tags); 1598 put.add(kv); 1599 updateValueSize(kv.getValueLength()); 1600 } else { 1601 put.add(FAMILY_NAME, qualifier, value); 1602 updateValueSize(value.length); 1603 } 1604 } 1605 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1606 mutator.mutate(put); 1607 } 1608 } 1609 1610 static class FilteredScanTest extends TableTest { 1611 protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); 1612 FilteredScanTest(Connection con, TestOptions options, Status status)1613 FilteredScanTest(Connection con, TestOptions options, Status status) { 1614 super(con, options, status); 1615 } 1616 1617 @Override testRow(int i)1618 void testRow(int i) throws IOException { 1619 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1620 Scan scan = constructScan(value); 1621 ResultScanner scanner = null; 1622 try { 1623 scanner = this.table.getScanner(scan); 1624 for (Result r = null; (r = scanner.next()) != null;) { 1625 updateValueSize(r); 1626 } 1627 } finally { 1628 if (scanner != null) scanner.close(); 1629 } 1630 } 1631 constructScan(byte[] valuePrefix)1632 protected Scan constructScan(byte[] valuePrefix) throws IOException { 1633 FilterList list = new FilterList(); 1634 Filter filter = new SingleColumnValueFilter( 1635 FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL, 1636 new BinaryComparator(valuePrefix) 1637 ); 1638 list.addFilter(filter); 1639 if(opts.filterAll) { 1640 list.addFilter(new FilterAllFilter()); 1641 } 1642 Scan scan = new Scan(); 1643 scan.setCaching(opts.caching); 1644 if (opts.addColumns) { 1645 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1646 } else { 1647 scan.addFamily(FAMILY_NAME); 1648 } 1649 scan.setFilter(list); 1650 return scan; 1651 } 1652 } 1653 1654 /** 1655 * Compute a throughput rate in MB/s. 1656 * @param rows Number of records consumed. 1657 * @param timeMs Time taken in milliseconds. 1658 * @return String value with label, ie '123.76 MB/s' 1659 */ calculateMbps(int rows, long timeMs, final int valueSize, int columns)1660 private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) { 1661 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH + 1662 ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns)); 1663 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 1664 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) 1665 .divide(BYTES_PER_MB, CXT); 1666 return FMT.format(mbps) + " MB/s"; 1667 } 1668 1669 /* 1670 * Format passed integer. 1671 * @param number 1672 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed 1673 * number (Does absolute in case number is negative). 1674 */ format(final int number)1675 public static byte [] format(final int number) { 1676 byte [] b = new byte[ROW_LENGTH]; 1677 int d = Math.abs(number); 1678 for (int i = b.length - 1; i >= 0; i--) { 1679 b[i] = (byte)((d % 10) + '0'); 1680 d /= 10; 1681 } 1682 return b; 1683 } 1684 1685 /* 1686 * This method takes some time and is done inline uploading data. For 1687 * example, doing the mapfile test, generation of the key and value 1688 * consumes about 30% of CPU time. 1689 * @return Generated random value to insert into a table cell. 1690 */ generateData(final Random r, int length)1691 public static byte[] generateData(final Random r, int length) { 1692 byte [] b = new byte [length]; 1693 int i; 1694 1695 for(i = 0; i < (length-8); i += 8) { 1696 b[i] = (byte) (65 + r.nextInt(26)); 1697 b[i+1] = b[i]; 1698 b[i+2] = b[i]; 1699 b[i+3] = b[i]; 1700 b[i+4] = b[i]; 1701 b[i+5] = b[i]; 1702 b[i+6] = b[i]; 1703 b[i+7] = b[i]; 1704 } 1705 1706 byte a = (byte) (65 + r.nextInt(26)); 1707 for(; i < length; i++) { 1708 b[i] = a; 1709 } 1710 return b; 1711 } 1712 1713 /** 1714 * @deprecated Use {@link #generateData(java.util.Random, int)} instead. 1715 * @return Generated random value to insert into a table cell. 1716 */ 1717 @Deprecated generateValue(final Random r)1718 public static byte[] generateValue(final Random r) { 1719 return generateData(r, DEFAULT_VALUE_LENGTH); 1720 } 1721 getRandomRow(final Random random, final int totalRows)1722 static byte [] getRandomRow(final Random random, final int totalRows) { 1723 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1724 } 1725 runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con, TestOptions opts, final Status status)1726 static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con, 1727 TestOptions opts, final Status status) 1728 throws IOException, InterruptedException { 1729 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " + 1730 opts.perClientRunRows + " rows"); 1731 long totalElapsedTime; 1732 1733 final Test t; 1734 try { 1735 Constructor<? extends Test> constructor = 1736 cmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 1737 t = constructor.newInstance(con, opts, status); 1738 } catch (NoSuchMethodException e) { 1739 throw new IllegalArgumentException("Invalid command class: " + 1740 cmd.getName() + ". It does not provide a constructor as described by " + 1741 "the javadoc comment. Available constructors are: " + 1742 Arrays.toString(cmd.getConstructors())); 1743 } catch (Exception e) { 1744 throw new IllegalStateException("Failed to construct command class", e); 1745 } 1746 totalElapsedTime = t.test(); 1747 1748 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 1749 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + 1750 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 1751 getAverageValueLength(opts), opts.columns) + ")"); 1752 1753 return new RunResult(totalElapsedTime, t.getLatency()); 1754 } 1755 getAverageValueLength(final TestOptions opts)1756 private static int getAverageValueLength(final TestOptions opts) { 1757 return opts.valueRandom? opts.valueSize/2: opts.valueSize; 1758 } 1759 runTest(final Class<? extends Test> cmd, TestOptions opts)1760 private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException, 1761 InterruptedException, ClassNotFoundException { 1762 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 1763 // the TestOptions introspection for us and dump the output in a readable format. 1764 LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts)); 1765 try(Connection conn = ConnectionFactory.createConnection(getConf()); 1766 Admin admin = conn.getAdmin()) { 1767 checkTable(admin, opts); 1768 } 1769 if (opts.nomapred) { 1770 doLocalClients(opts, getConf()); 1771 } else { 1772 doMapReduce(opts, getConf()); 1773 } 1774 } 1775 printUsage()1776 protected void printUsage() { 1777 printUsage(this.getClass().getName(), null); 1778 } 1779 printUsage(final String message)1780 protected static void printUsage(final String message) { 1781 printUsage(PerformanceEvaluation.class.getName(), message); 1782 } 1783 printUsageAndExit(final String message, final int exitCode)1784 protected static void printUsageAndExit(final String message, final int exitCode) { 1785 printUsage(message); 1786 System.exit(exitCode); 1787 } 1788 printUsage(final String className, final String message)1789 protected static void printUsage(final String className, final String message) { 1790 if (message != null && message.length() > 0) { 1791 System.err.println(message); 1792 } 1793 System.err.println("Usage: java " + className + " \\"); 1794 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 1795 System.err.println(); 1796 System.err.println("Options:"); 1797 System.err.println(" nomapred Run multiple clients using threads " + 1798 "(rather than use mapreduce)"); 1799 System.err.println(" rows Rows each client runs. Default: One million"); 1800 System.err.println(" size Total size in GiB. Mutually exclusive with --rows. " + 1801 "Default: 1.0."); 1802 System.err.println(" sampleRate Execute test on a sample of total " + 1803 "rows. Only supported by randomRead. Default: 1.0"); 1804 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + 1805 "Default: 0"); 1806 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1807 System.err.println(" multiGet If >0, when doing RandomRead, perform multiple gets " + 1808 "instead of single gets. Default: 0"); 1809 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1810 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 1811 "Default: false"); 1812 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1813 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 1814 System.err.println(" oneCon all the threads share the same connection. Default: False"); 1815 System.err.println(" presplit Create presplit table. Recommended for accurate perf " + 1816 "analysis (see guide). Default: disabled"); 1817 System.err.println(" inmemory Tries to keep the HFiles of the CF " + 1818 "inmemory as far as possible. Not guaranteed that reads are always served " + 1819 "from memory. Default: false"); 1820 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 1821 "Default: false"); 1822 System.err.println(" numoftags Specify the no of tags that would be needed. " + 1823 "This works only if usetags is true."); 1824 System.err.println(" filterAll Helps to filter out all the rows on the server side" 1825 + " there by not returning any thing back to the client. Helps to check the server side" 1826 + " performance. Uses FilterAllFilter internally. "); 1827 System.err.println(" latency Set to report operation latencies. Default: False"); 1828 System.err.println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 1829 System.err.println(" valueSize Pass value size to use: Default: 1024"); 1830 System.err.println(" valueRandom Set if we should vary value size between 0 and " + 1831 "'valueSize'; set on read for stats on size: Default: Not set."); 1832 System.err.println(" valueZipf Set if we should vary value size between 0 and " + 1833 "'valueSize' in zipf form: Default: Not set."); 1834 System.err.println(" period Report every 'period' rows: " + 1835 "Default: opts.perClientRunRows / 10"); 1836 System.err.println(" multiGet Batch gets together into groups of N. Only supported " + 1837 "by randomRead. Default: disabled"); 1838 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 1839 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 1840 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 1841 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 1842 System.err.println(" columns Columns to write per row. Default: 1"); 1843 System.err.println(" caching Scan caching to use. Default: 30"); 1844 System.err.println(); 1845 System.err.println(" Note: -D properties will be applied to the conf used. "); 1846 System.err.println(" For example: "); 1847 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1848 System.err.println(" -Dmapreduce.task.timeout=60000"); 1849 System.err.println(); 1850 System.err.println("Command:"); 1851 for (CmdDescriptor command : COMMANDS.values()) { 1852 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1853 } 1854 System.err.println(); 1855 System.err.println("Args:"); 1856 System.err.println(" nclients Integer. Required. Total number of " + 1857 "clients (and HRegionServers)"); 1858 System.err.println(" running: 1 <= value <= 500"); 1859 System.err.println("Examples:"); 1860 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 1861 System.err.println(" $ bin/hbase " + className + " sequentialWrite 1"); 1862 System.err.println(" To run 10 clients doing increments over ten rows:"); 1863 System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10"); 1864 } 1865 1866 /** 1867 * Parse options passed in via an arguments array. Assumes that array has been split 1868 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain 1869 * in the queue at the conclusion of this method call. It's up to the caller to deal 1870 * with these unrecognized arguments. 1871 */ parseOpts(Queue<String> args)1872 static TestOptions parseOpts(Queue<String> args) { 1873 TestOptions opts = new TestOptions(); 1874 1875 String cmd = null; 1876 while ((cmd = args.poll()) != null) { 1877 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1878 // place item back onto queue so that caller knows parsing was incomplete 1879 args.add(cmd); 1880 break; 1881 } 1882 1883 final String nmr = "--nomapred"; 1884 if (cmd.startsWith(nmr)) { 1885 opts.nomapred = true; 1886 continue; 1887 } 1888 1889 final String rows = "--rows="; 1890 if (cmd.startsWith(rows)) { 1891 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 1892 continue; 1893 } 1894 1895 final String sampleRate = "--sampleRate="; 1896 if (cmd.startsWith(sampleRate)) { 1897 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 1898 continue; 1899 } 1900 1901 final String table = "--table="; 1902 if (cmd.startsWith(table)) { 1903 opts.tableName = cmd.substring(table.length()); 1904 continue; 1905 } 1906 1907 final String startRow = "--startRow="; 1908 if (cmd.startsWith(startRow)) { 1909 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 1910 continue; 1911 } 1912 1913 final String compress = "--compress="; 1914 if (cmd.startsWith(compress)) { 1915 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1916 continue; 1917 } 1918 1919 final String traceRate = "--traceRate="; 1920 if (cmd.startsWith(traceRate)) { 1921 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 1922 continue; 1923 } 1924 1925 final String blockEncoding = "--blockEncoding="; 1926 if (cmd.startsWith(blockEncoding)) { 1927 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1928 continue; 1929 } 1930 1931 final String flushCommits = "--flushCommits="; 1932 if (cmd.startsWith(flushCommits)) { 1933 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1934 continue; 1935 } 1936 1937 final String writeToWAL = "--writeToWAL="; 1938 if (cmd.startsWith(writeToWAL)) { 1939 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1940 continue; 1941 } 1942 1943 final String presplit = "--presplit="; 1944 if (cmd.startsWith(presplit)) { 1945 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1946 continue; 1947 } 1948 1949 final String inMemory = "--inmemory="; 1950 if (cmd.startsWith(inMemory)) { 1951 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1952 continue; 1953 } 1954 1955 final String autoFlush = "--autoFlush="; 1956 if (cmd.startsWith(autoFlush)) { 1957 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 1958 continue; 1959 } 1960 1961 final String onceCon = "--oneCon="; 1962 if (cmd.startsWith(onceCon)) { 1963 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 1964 continue; 1965 } 1966 1967 final String latency = "--latency"; 1968 if (cmd.startsWith(latency)) { 1969 opts.reportLatency = true; 1970 continue; 1971 } 1972 1973 final String multiGet = "--multiGet="; 1974 if (cmd.startsWith(multiGet)) { 1975 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 1976 continue; 1977 } 1978 1979 final String useTags = "--usetags="; 1980 if (cmd.startsWith(useTags)) { 1981 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1982 continue; 1983 } 1984 1985 final String noOfTags = "--numoftags="; 1986 if (cmd.startsWith(noOfTags)) { 1987 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1988 continue; 1989 } 1990 1991 final String replicas = "--replicas="; 1992 if (cmd.startsWith(replicas)) { 1993 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 1994 continue; 1995 } 1996 1997 final String filterOutAll = "--filterAll"; 1998 if (cmd.startsWith(filterOutAll)) { 1999 opts.filterAll = true; 2000 continue; 2001 } 2002 2003 final String size = "--size="; 2004 if (cmd.startsWith(size)) { 2005 opts.size = Float.parseFloat(cmd.substring(size.length())); 2006 continue; 2007 } 2008 2009 final String splitPolicy = "--splitPolicy="; 2010 if (cmd.startsWith(splitPolicy)) { 2011 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2012 continue; 2013 } 2014 2015 final String randomSleep = "--randomSleep="; 2016 if (cmd.startsWith(randomSleep)) { 2017 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2018 continue; 2019 } 2020 2021 final String bloomFilter = "--bloomFilter="; 2022 if (cmd.startsWith(bloomFilter)) { 2023 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2024 continue; 2025 } 2026 2027 final String valueSize = "--valueSize="; 2028 if (cmd.startsWith(valueSize)) { 2029 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2030 continue; 2031 } 2032 2033 final String valueRandom = "--valueRandom"; 2034 if (cmd.startsWith(valueRandom)) { 2035 opts.valueRandom = true; 2036 if (opts.valueZipf) { 2037 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2038 } 2039 continue; 2040 } 2041 2042 final String valueZipf = "--valueZipf"; 2043 if (cmd.startsWith(valueZipf)) { 2044 opts.valueZipf = true; 2045 if (opts.valueRandom) { 2046 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2047 } 2048 continue; 2049 } 2050 2051 final String period = "--period="; 2052 if (cmd.startsWith(period)) { 2053 opts.period = Integer.parseInt(cmd.substring(period.length())); 2054 continue; 2055 } 2056 2057 final String addColumns = "--addColumns="; 2058 if (cmd.startsWith(addColumns)) { 2059 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2060 continue; 2061 } 2062 2063 final String columns = "--columns="; 2064 if (cmd.startsWith(columns)) { 2065 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2066 continue; 2067 } 2068 2069 final String caching = "--caching="; 2070 if (cmd.startsWith(caching)) { 2071 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2072 continue; 2073 } 2074 2075 if (isCommandClass(cmd)) { 2076 opts.cmdName = cmd; 2077 opts.numClientThreads = Integer.parseInt(args.remove()); 2078 int rowsPerGB = getRowsPerGB(opts); 2079 if (opts.size != DEFAULT_OPTS.size && 2080 opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { 2081 throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments."); 2082 } 2083 if (opts.size != DEFAULT_OPTS.size) { 2084 // total size in GB specified 2085 opts.totalRows = (int) opts.size * rowsPerGB; 2086 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 2087 } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { 2088 // number of rows specified 2089 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 2090 opts.size = opts.totalRows / rowsPerGB; 2091 } 2092 break; 2093 } else { 2094 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2095 } 2096 2097 // Not matching any option or command. 2098 System.err.println("Error: Wrong option or command: " + cmd); 2099 args.add(cmd); 2100 break; 2101 } 2102 return opts; 2103 } 2104 getRowsPerGB(final TestOptions opts)2105 static int getRowsPerGB(final TestOptions opts) { 2106 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns()); 2107 } 2108 2109 @Override run(String[] args)2110 public int run(String[] args) throws Exception { 2111 // Process command-line args. TODO: Better cmd-line processing 2112 // (but hopefully something not as painful as cli options). 2113 int errCode = -1; 2114 if (args.length < 1) { 2115 printUsage(); 2116 return errCode; 2117 } 2118 2119 try { 2120 LinkedList<String> argv = new LinkedList<String>(); 2121 argv.addAll(Arrays.asList(args)); 2122 TestOptions opts = parseOpts(argv); 2123 2124 // args remaining, print help and exit 2125 if (!argv.isEmpty()) { 2126 errCode = 0; 2127 printUsage(); 2128 return errCode; 2129 } 2130 2131 // must run at least 1 client 2132 if (opts.numClientThreads <= 0) { 2133 throw new IllegalArgumentException("Number of clients must be > 0"); 2134 } 2135 2136 Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName); 2137 if (cmdClass != null) { 2138 runTest(cmdClass, opts); 2139 errCode = 0; 2140 } 2141 2142 } catch (Exception e) { 2143 e.printStackTrace(); 2144 } 2145 2146 return errCode; 2147 } 2148 isCommandClass(String cmd)2149 private static boolean isCommandClass(String cmd) { 2150 return COMMANDS.containsKey(cmd); 2151 } 2152 determineCommandClass(String cmd)2153 private static Class<? extends Test> determineCommandClass(String cmd) { 2154 CmdDescriptor descriptor = COMMANDS.get(cmd); 2155 return descriptor != null ? descriptor.getCmdClass() : null; 2156 } 2157 main(final String[] args)2158 public static void main(final String[] args) throws Exception { 2159 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 2160 System.exit(res); 2161 } 2162 } 2163