1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase;
20 
21 import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
22 
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.lang.reflect.Constructor;
26 import java.math.BigDecimal;
27 import java.math.MathContext;
28 import java.text.DecimalFormat;
29 import java.text.SimpleDateFormat;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.LinkedList;
34 import java.util.Map;
35 import java.util.Queue;
36 import java.util.Random;
37 import java.util.TreeMap;
38 import java.util.concurrent.Callable;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 
44 import com.google.common.base.Objects;
45 import com.google.common.util.concurrent.ThreadFactoryBuilder;
46 
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.conf.Configured;
51 import org.apache.hadoop.fs.FileSystem;
52 import org.apache.hadoop.fs.Path;
53 import org.apache.hadoop.hbase.classification.InterfaceAudience;
54 import org.apache.hadoop.hbase.client.Admin;
55 import org.apache.hadoop.hbase.client.Append;
56 import org.apache.hadoop.hbase.client.BufferedMutator;
57 import org.apache.hadoop.hbase.client.Connection;
58 import org.apache.hadoop.hbase.client.ConnectionFactory;
59 import org.apache.hadoop.hbase.client.Consistency;
60 import org.apache.hadoop.hbase.client.Delete;
61 import org.apache.hadoop.hbase.client.Durability;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.client.Increment;
64 import org.apache.hadoop.hbase.client.Put;
65 import org.apache.hadoop.hbase.client.Result;
66 import org.apache.hadoop.hbase.client.ResultScanner;
67 import org.apache.hadoop.hbase.client.RowMutations;
68 import org.apache.hadoop.hbase.client.Scan;
69 import org.apache.hadoop.hbase.client.Table;
70 import org.apache.hadoop.hbase.filter.BinaryComparator;
71 import org.apache.hadoop.hbase.filter.CompareFilter;
72 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
73 import org.apache.hadoop.hbase.filter.Filter;
74 import org.apache.hadoop.hbase.filter.FilterAllFilter;
75 import org.apache.hadoop.hbase.filter.FilterList;
76 import org.apache.hadoop.hbase.filter.PageFilter;
77 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
78 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
79 import org.apache.hadoop.hbase.io.compress.Compression;
80 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
81 import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
82 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
83 import org.apache.hadoop.hbase.regionserver.BloomType;
84 import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
85 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
86 import org.apache.hadoop.hbase.util.*;
87 import org.apache.hadoop.io.LongWritable;
88 import org.apache.hadoop.io.Text;
89 import org.apache.hadoop.mapreduce.Job;
90 import org.apache.hadoop.mapreduce.Mapper;
91 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
92 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
93 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
94 import org.apache.hadoop.util.Tool;
95 import org.apache.hadoop.util.ToolRunner;
96 import org.codehaus.jackson.map.ObjectMapper;
97 
98 import com.yammer.metrics.core.Histogram;
99 import com.yammer.metrics.stats.UniformSample;
100 import com.yammer.metrics.stats.Snapshot;
101 
102 import org.apache.htrace.Sampler;
103 import org.apache.htrace.Trace;
104 import org.apache.htrace.TraceScope;
105 import org.apache.htrace.impl.ProbabilitySampler;
106 
107 /**
108  * Script used evaluating HBase performance and scalability.  Runs a HBase
109  * client that steps through one of a set of hardcoded tests or 'experiments'
110  * (e.g. a random reads test, a random writes test, etc.). Pass on the
111  * command-line which test to run and how many clients are participating in
112  * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
113  *
114  * <p>This class sets up and runs the evaluation programs described in
115  * Section 7, <i>Performance Evaluation</i>, of the <a
116  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
117  * paper, pages 8-10.
118  *
119  * <p>By default, runs as a mapreduce job where each mapper runs a single test
120  * client. Can also run as a non-mapreduce, multithreaded application by
121  * specifying {@code --nomapred}. Each client does about 1GB of data, unless
122  * specified otherwise.
123  */
124 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
125 public class PerformanceEvaluation extends Configured implements Tool {
126   private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
127   private static final ObjectMapper MAPPER = new ObjectMapper();
128   static {
MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true)129     MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
130   }
131 
132   public static final String TABLE_NAME = "TestTable";
133   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
134   public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
135   public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
136   public static final int DEFAULT_VALUE_LENGTH = 1000;
137   public static final int ROW_LENGTH = 26;
138 
139   private static final int ONE_GB = 1024 * 1024 * 1000;
140   private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
141   // TODO : should we make this configurable
142   private static final int TAG_LENGTH = 256;
143   private static final DecimalFormat FMT = new DecimalFormat("0.##");
144   private static final MathContext CXT = MathContext.DECIMAL64;
145   private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
146   private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
147   private static final TestOptions DEFAULT_OPTS = new TestOptions();
148 
149   private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
150   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
151 
152   static {
addCommandDescriptor(RandomReadTest.class, R, R)153     addCommandDescriptor(RandomReadTest.class, "randomRead",
154       "Run random read test");
addCommandDescriptor(RandomSeekScanTest.class, R, R)155     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
156       "Run random seek and scan 100 test");
addCommandDescriptor(RandomScanWithRange10Test.class, R, R)157     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
158       "Run random seek scan with both start and stop row (max 10 rows)");
addCommandDescriptor(RandomScanWithRange100Test.class, R, R)159     addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
160       "Run random seek scan with both start and stop row (max 100 rows)");
addCommandDescriptor(RandomScanWithRange1000Test.class, R, R)161     addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
162       "Run random seek scan with both start and stop row (max 1000 rows)");
addCommandDescriptor(RandomScanWithRange10000Test.class, R, R)163     addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
164       "Run random seek scan with both start and stop row (max 10000 rows)");
addCommandDescriptor(RandomWriteTest.class, R, R)165     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
166       "Run random write test");
addCommandDescriptor(SequentialReadTest.class, R, R)167     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
168       "Run sequential read test");
addCommandDescriptor(SequentialWriteTest.class, R, R)169     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
170       "Run sequential write test");
addCommandDescriptor(ScanTest.class, R, R)171     addCommandDescriptor(ScanTest.class, "scan",
172       "Run scan test (read every row)");
addCommandDescriptor(FilteredScanTest.class, R, R + R)173     addCommandDescriptor(FilteredScanTest.class, "filterScan",
174       "Run scan test using a filter to find a specific row based on it's value " +
175       "(make sure to use --rows=20)");
addCommandDescriptor(IncrementTest.class, R, R)176     addCommandDescriptor(IncrementTest.class, "increment",
177       "Increment on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(AppendTest.class, R, R)178     addCommandDescriptor(AppendTest.class, "append",
179       "Append on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(CheckAndMutateTest.class, R, R)180     addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
181       "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(CheckAndPutTest.class, R, R)182     addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
183       "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(CheckAndDeleteTest.class, R, R)184     addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
185       "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
186   }
187 
188   /**
189    * Enum for map metrics.  Keep it out here rather than inside in the Map
190    * inner-class so we can find associated properties.
191    */
192   protected static enum Counter {
193     /** elapsed time */
194     ELAPSED_TIME,
195     /** number of rows */
196     ROWS
197   }
198 
199   protected static class RunResult implements Comparable<RunResult> {
RunResult(long duration, Histogram hist)200     public RunResult(long duration, Histogram hist) {
201       this.duration = duration;
202       this.hist = hist;
203     }
204 
205     public final long duration;
206     public final Histogram hist;
207 
208     @Override
toString()209     public String toString() {
210       return Long.toString(duration);
211     }
212 
compareTo(RunResult o)213     @Override public int compareTo(RunResult o) {
214       return Long.compare(this.duration, o.duration);
215     }
216   }
217 
218   /**
219    * Constructor
220    * @param conf Configuration object
221    */
PerformanceEvaluation(final Configuration conf)222   public PerformanceEvaluation(final Configuration conf) {
223     super(conf);
224   }
225 
addCommandDescriptor(Class<? extends Test> cmdClass, String name, String description)226   protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
227       String name, String description) {
228     CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
229     COMMANDS.put(name, cmdDescriptor);
230   }
231 
232   /**
233    * Implementations can have their status set.
234    */
235   interface Status {
236     /**
237      * Sets status
238      * @param msg status message
239      * @throws IOException
240      */
setStatus(final String msg)241     void setStatus(final String msg) throws IOException;
242   }
243 
244   /**
245    * MapReduce job that runs a performance evaluation client in each map task.
246    */
247   public static class EvaluationMapTask
248       extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
249 
250     /** configuration parameter name that contains the command */
251     public final static String CMD_KEY = "EvaluationMapTask.command";
252     /** configuration parameter name that contains the PE impl */
253     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
254 
255     private Class<? extends Test> cmd;
256 
257     @Override
setup(Context context)258     protected void setup(Context context) throws IOException, InterruptedException {
259       this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
260 
261       // this is required so that extensions of PE are instantiated within the
262       // map reduce task...
263       Class<? extends PerformanceEvaluation> peClass =
264           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
265       try {
266         peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
267       } catch (Exception e) {
268         throw new IllegalStateException("Could not instantiate PE instance", e);
269       }
270     }
271 
forName(String className, Class<Type> type)272     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
273       try {
274         return Class.forName(className).asSubclass(type);
275       } catch (ClassNotFoundException e) {
276         throw new IllegalStateException("Could not find class for name: " + className, e);
277       }
278     }
279 
280     @Override
map(LongWritable key, Text value, final Context context)281     protected void map(LongWritable key, Text value, final Context context)
282            throws IOException, InterruptedException {
283 
284       Status status = new Status() {
285         @Override
286         public void setStatus(String msg) {
287            context.setStatus(msg);
288         }
289       };
290 
291       ObjectMapper mapper = new ObjectMapper();
292       TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
293       Configuration conf = HBaseConfiguration.create(context.getConfiguration());
294       final Connection con = ConnectionFactory.createConnection(conf);
295 
296       // Evaluation task
297       RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
298       // Collect how much time the thing took. Report as map output and
299       // to the ELAPSED_TIME counter.
300       context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
301       context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
302       context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
303       context.progress();
304     }
305   }
306 
307   /*
308    * If table does not already exist, create. Also create a table when
309    * {@code opts.presplitRegions} is specified or when the existing table's
310    * region replica count doesn't match {@code opts.replicas}.
311    */
checkTable(Admin admin, TestOptions opts)312   static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
313     TableName tableName = TableName.valueOf(opts.tableName);
314     boolean needsDelete = false, exists = admin.tableExists(tableName);
315     boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
316       || opts.cmdName.toLowerCase().contains("scan");
317     if (!exists && isReadCmd) {
318       throw new IllegalStateException(
319         "Must specify an existing table for read commands. Run a write command first.");
320     }
321     HTableDescriptor desc =
322       exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
323     byte[][] splits = getSplits(opts);
324 
325     // recreate the table when user has requested presplit or when existing
326     // {RegionSplitPolicy,replica count} does not match requested.
327     if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
328       || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)
329       || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) {
330       needsDelete = true;
331       // wait, why did it delete my table?!?
332       LOG.debug(Objects.toStringHelper("needsDelete")
333         .add("needsDelete", needsDelete)
334         .add("isReadCmd", isReadCmd)
335         .add("exists", exists)
336         .add("desc", desc)
337         .add("presplit", opts.presplitRegions)
338         .add("splitPolicy", opts.splitPolicy)
339         .add("replicas", opts.replicas));
340     }
341 
342     // remove an existing table
343     if (needsDelete) {
344       if (admin.isTableEnabled(tableName)) {
345         admin.disableTable(tableName);
346       }
347       admin.deleteTable(tableName);
348     }
349 
350     // table creation is necessary
351     if (!exists || needsDelete) {
352       desc = getTableDescriptor(opts);
353       if (splits != null) {
354         if (LOG.isDebugEnabled()) {
355           for (int i = 0; i < splits.length; i++) {
356             LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
357           }
358         }
359       }
360       admin.createTable(desc, splits);
361       LOG.info("Table " + desc + " created");
362     }
363     return admin.tableExists(tableName);
364   }
365 
366   /**
367    * Create an HTableDescriptor from provided TestOptions.
368    */
getTableDescriptor(TestOptions opts)369   protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
370     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(opts.tableName));
371     HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
372     family.setDataBlockEncoding(opts.blockEncoding);
373     family.setCompressionType(opts.compression);
374     family.setBloomFilterType(opts.bloomType);
375     if (opts.inMemoryCF) {
376       family.setInMemory(true);
377     }
378     desc.addFamily(family);
379     if (opts.replicas != DEFAULT_OPTS.replicas) {
380       desc.setRegionReplication(opts.replicas);
381     }
382     if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
383       desc.setRegionSplitPolicyClassName(opts.splitPolicy);
384     }
385     return desc;
386   }
387 
388   /**
389    * generates splits based on total number of rows and specified split regions
390    */
getSplits(TestOptions opts)391   protected static byte[][] getSplits(TestOptions opts) {
392     if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
393       return null;
394 
395     int numSplitPoints = opts.presplitRegions - 1;
396     byte[][] splits = new byte[numSplitPoints][];
397     int jump = opts.totalRows / opts.presplitRegions;
398     for (int i = 0; i < numSplitPoints; i++) {
399       int rowkey = jump * (1 + i);
400       splits[i] = format(rowkey);
401     }
402     return splits;
403   }
404 
405   /*
406    * Run all clients in this vm each to its own thread.
407    */
doLocalClients(final TestOptions opts, final Configuration conf)408   static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
409       throws IOException, InterruptedException {
410     final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
411     assert cmd != null;
412     @SuppressWarnings("unchecked")
413     Future<RunResult>[] threads = new Future[opts.numClientThreads];
414     RunResult[] results = new RunResult[opts.numClientThreads];
415     ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
416       new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
417     final Connection con = ConnectionFactory.createConnection(conf);
418     for (int i = 0; i < threads.length; i++) {
419       final int index = i;
420       threads[i] = pool.submit(new Callable<RunResult>() {
421         @Override
422         public RunResult call() throws Exception {
423           TestOptions threadOpts = new TestOptions(opts);
424           if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
425           RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
426             @Override
427             public void setStatus(final String msg) throws IOException {
428               LOG.info(msg);
429             }
430           });
431           LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
432             "ms over " + threadOpts.perClientRunRows + " rows");
433           return run;
434         }
435       });
436     }
437     pool.shutdown();
438 
439     for (int i = 0; i < threads.length; i++) {
440       try {
441         results[i] = threads[i].get();
442       } catch (ExecutionException e) {
443         throw new IOException(e.getCause());
444       }
445     }
446     final String test = cmd.getSimpleName();
447     LOG.info("[" + test + "] Summary of timings (ms): "
448              + Arrays.toString(results));
449     Arrays.sort(results);
450     long total = 0;
451     for (RunResult result : results) {
452       total += result.duration;
453     }
454     LOG.info("[" + test + "]"
455       + "\tMin: " + results[0] + "ms"
456       + "\tMax: " + results[results.length - 1] + "ms"
457       + "\tAvg: " + (total / results.length) + "ms");
458 
459     con.close();
460 
461     return results;
462   }
463 
464   /*
465    * Run a mapreduce job.  Run as many maps as asked-for clients.
466    * Before we start up the job, write out an input file with instruction
467    * per client regards which row they are to start on.
468    * @param cmd Command to run.
469    * @throws IOException
470    */
doMapReduce(TestOptions opts, final Configuration conf)471   static Job doMapReduce(TestOptions opts, final Configuration conf)
472       throws IOException, InterruptedException, ClassNotFoundException {
473     final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
474     assert cmd != null;
475     Path inputDir = writeInputFile(conf, opts);
476     conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
477     conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
478     Job job = Job.getInstance(conf);
479     job.setJarByClass(PerformanceEvaluation.class);
480     job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
481 
482     job.setInputFormatClass(NLineInputFormat.class);
483     NLineInputFormat.setInputPaths(job, inputDir);
484     // this is default, but be explicit about it just in case.
485     NLineInputFormat.setNumLinesPerSplit(job, 1);
486 
487     job.setOutputKeyClass(LongWritable.class);
488     job.setOutputValueClass(LongWritable.class);
489 
490     job.setMapperClass(EvaluationMapTask.class);
491     job.setReducerClass(LongSumReducer.class);
492 
493     job.setNumReduceTasks(1);
494 
495     job.setOutputFormatClass(TextOutputFormat.class);
496     TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
497 
498     TableMapReduceUtil.addDependencyJars(job);
499     TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
500       Histogram.class,     // yammer metrics
501       ObjectMapper.class); // jackson-mapper-asl
502 
503     TableMapReduceUtil.initCredentials(job);
504 
505     job.waitForCompletion(true);
506     return job;
507   }
508 
509   /*
510    * Write input file of offsets-per-client for the mapreduce job.
511    * @param c Configuration
512    * @return Directory that contains file written.
513    * @throws IOException
514    */
writeInputFile(final Configuration c, final TestOptions opts)515   private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
516     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
517     Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
518     Path inputDir = new Path(jobdir, "inputs");
519 
520     FileSystem fs = FileSystem.get(c);
521     fs.mkdirs(inputDir);
522 
523     Path inputFile = new Path(inputDir, "input.txt");
524     PrintStream out = new PrintStream(fs.create(inputFile));
525     // Make input random.
526     Map<Integer, String> m = new TreeMap<Integer, String>();
527     Hash h = MurmurHash.getInstance();
528     int perClientRows = (opts.totalRows / opts.numClientThreads);
529     try {
530       for (int i = 0; i < 10; i++) {
531         for (int j = 0; j < opts.numClientThreads; j++) {
532           TestOptions next = new TestOptions(opts);
533           next.startRow = (j * perClientRows) + (i * (perClientRows/10));
534           next.perClientRunRows = perClientRows / 10;
535           String s = MAPPER.writeValueAsString(next);
536           LOG.info("maptask input=" + s);
537           int hash = h.hash(Bytes.toBytes(s));
538           m.put(hash, s);
539         }
540       }
541       for (Map.Entry<Integer, String> e: m.entrySet()) {
542         out.println(e.getValue());
543       }
544     } finally {
545       out.close();
546     }
547     return inputDir;
548   }
549 
550   /**
551    * Describes a command.
552    */
553   static class CmdDescriptor {
554     private Class<? extends Test> cmdClass;
555     private String name;
556     private String description;
557 
CmdDescriptor(Class<? extends Test> cmdClass, String name, String description)558     CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
559       this.cmdClass = cmdClass;
560       this.name = name;
561       this.description = description;
562     }
563 
getCmdClass()564     public Class<? extends Test> getCmdClass() {
565       return cmdClass;
566     }
567 
getName()568     public String getName() {
569       return name;
570     }
571 
getDescription()572     public String getDescription() {
573       return description;
574     }
575   }
576 
577   /**
578    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
579    * This makes tracking all these arguments a little easier.
580    * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
581    * serialization of this TestOptions class behave), and you need to add to the clone constructor
582    * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
583    */
584   static class TestOptions {
585     String cmdName = null;
586     boolean nomapred = false;
587     boolean filterAll = false;
588     int startRow = 0;
589     float size = 1.0f;
590     int perClientRunRows = DEFAULT_ROWS_PER_GB;
591     int numClientThreads = 1;
592     int totalRows = DEFAULT_ROWS_PER_GB;
593     float sampleRate = 1.0f;
594     double traceRate = 0.0;
595     String tableName = TABLE_NAME;
596     boolean flushCommits = true;
597     boolean writeToWAL = true;
598     boolean autoFlush = false;
599     boolean oneCon = false;
600     boolean useTags = false;
601     int noOfTags = 1;
602     boolean reportLatency = false;
603     int multiGet = 0;
604     int randomSleep = 0;
605     boolean inMemoryCF = false;
606     int presplitRegions = 0;
607     int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
608     String splitPolicy = null;
609     Compression.Algorithm compression = Compression.Algorithm.NONE;
610     BloomType bloomType = BloomType.ROW;
611     DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
612     boolean valueRandom = false;
613     boolean valueZipf = false;
614     int valueSize = DEFAULT_VALUE_LENGTH;
615     int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
616     int columns = 1;
617     int caching = 30;
618     boolean addColumns = true;
619 
TestOptions()620     public TestOptions() {}
621 
622     /**
623      * Clone constructor.
624      * @param that Object to copy from.
625      */
TestOptions(TestOptions that)626     public TestOptions(TestOptions that) {
627       this.cmdName = that.cmdName;
628       this.nomapred = that.nomapred;
629       this.startRow = that.startRow;
630       this.size = that.size;
631       this.perClientRunRows = that.perClientRunRows;
632       this.numClientThreads = that.numClientThreads;
633       this.totalRows = that.totalRows;
634       this.sampleRate = that.sampleRate;
635       this.traceRate = that.traceRate;
636       this.tableName = that.tableName;
637       this.flushCommits = that.flushCommits;
638       this.writeToWAL = that.writeToWAL;
639       this.autoFlush = that.autoFlush;
640       this.oneCon = that.oneCon;
641       this.useTags = that.useTags;
642       this.noOfTags = that.noOfTags;
643       this.reportLatency = that.reportLatency;
644       this.multiGet = that.multiGet;
645       this.inMemoryCF = that.inMemoryCF;
646       this.presplitRegions = that.presplitRegions;
647       this.replicas = that.replicas;
648       this.splitPolicy = that.splitPolicy;
649       this.compression = that.compression;
650       this.blockEncoding = that.blockEncoding;
651       this.filterAll = that.filterAll;
652       this.bloomType = that.bloomType;
653       this.valueRandom = that.valueRandom;
654       this.valueZipf = that.valueZipf;
655       this.valueSize = that.valueSize;
656       this.period = that.period;
657       this.randomSleep = that.randomSleep;
658       this.addColumns = that.addColumns;
659       this.columns = that.columns;
660       this.caching = that.caching;
661     }
662 
getCaching()663     public int getCaching() {
664       return this.caching;
665     }
666 
setCaching(final int caching)667     public void setCaching(final int caching) {
668       this.caching = caching;
669     }
670 
getColumns()671     public int getColumns() {
672       return this.columns;
673     }
674 
setColumns(final int columns)675     public void setColumns(final int columns) {
676       this.columns = columns;
677     }
678 
isValueZipf()679     public boolean isValueZipf() {
680       return valueZipf;
681     }
682 
setValueZipf(boolean valueZipf)683     public void setValueZipf(boolean valueZipf) {
684       this.valueZipf = valueZipf;
685     }
686 
getCmdName()687     public String getCmdName() {
688       return cmdName;
689     }
690 
setCmdName(String cmdName)691     public void setCmdName(String cmdName) {
692       this.cmdName = cmdName;
693     }
694 
getRandomSleep()695     public int getRandomSleep() {
696       return randomSleep;
697     }
698 
setRandomSleep(int randomSleep)699     public void setRandomSleep(int randomSleep) {
700       this.randomSleep = randomSleep;
701     }
702 
getReplicas()703     public int getReplicas() {
704       return replicas;
705     }
706 
setReplicas(int replicas)707     public void setReplicas(int replicas) {
708       this.replicas = replicas;
709     }
710 
getSplitPolicy()711     public String getSplitPolicy() {
712       return splitPolicy;
713     }
714 
setSplitPolicy(String splitPolicy)715     public void setSplitPolicy(String splitPolicy) {
716       this.splitPolicy = splitPolicy;
717     }
718 
setNomapred(boolean nomapred)719     public void setNomapred(boolean nomapred) {
720       this.nomapred = nomapred;
721     }
722 
setFilterAll(boolean filterAll)723     public void setFilterAll(boolean filterAll) {
724       this.filterAll = filterAll;
725     }
726 
setStartRow(int startRow)727     public void setStartRow(int startRow) {
728       this.startRow = startRow;
729     }
730 
setSize(float size)731     public void setSize(float size) {
732       this.size = size;
733     }
734 
setPerClientRunRows(int perClientRunRows)735     public void setPerClientRunRows(int perClientRunRows) {
736       this.perClientRunRows = perClientRunRows;
737     }
738 
setNumClientThreads(int numClientThreads)739     public void setNumClientThreads(int numClientThreads) {
740       this.numClientThreads = numClientThreads;
741     }
742 
setTotalRows(int totalRows)743     public void setTotalRows(int totalRows) {
744       this.totalRows = totalRows;
745     }
746 
setSampleRate(float sampleRate)747     public void setSampleRate(float sampleRate) {
748       this.sampleRate = sampleRate;
749     }
750 
setTraceRate(double traceRate)751     public void setTraceRate(double traceRate) {
752       this.traceRate = traceRate;
753     }
754 
setTableName(String tableName)755     public void setTableName(String tableName) {
756       this.tableName = tableName;
757     }
758 
setFlushCommits(boolean flushCommits)759     public void setFlushCommits(boolean flushCommits) {
760       this.flushCommits = flushCommits;
761     }
762 
setWriteToWAL(boolean writeToWAL)763     public void setWriteToWAL(boolean writeToWAL) {
764       this.writeToWAL = writeToWAL;
765     }
766 
setAutoFlush(boolean autoFlush)767     public void setAutoFlush(boolean autoFlush) {
768       this.autoFlush = autoFlush;
769     }
770 
setOneCon(boolean oneCon)771     public void setOneCon(boolean oneCon) {
772       this.oneCon = oneCon;
773     }
774 
setUseTags(boolean useTags)775     public void setUseTags(boolean useTags) {
776       this.useTags = useTags;
777     }
778 
setNoOfTags(int noOfTags)779     public void setNoOfTags(int noOfTags) {
780       this.noOfTags = noOfTags;
781     }
782 
setReportLatency(boolean reportLatency)783     public void setReportLatency(boolean reportLatency) {
784       this.reportLatency = reportLatency;
785     }
786 
setMultiGet(int multiGet)787     public void setMultiGet(int multiGet) {
788       this.multiGet = multiGet;
789     }
790 
setInMemoryCF(boolean inMemoryCF)791     public void setInMemoryCF(boolean inMemoryCF) {
792       this.inMemoryCF = inMemoryCF;
793     }
794 
setPresplitRegions(int presplitRegions)795     public void setPresplitRegions(int presplitRegions) {
796       this.presplitRegions = presplitRegions;
797     }
798 
setCompression(Compression.Algorithm compression)799     public void setCompression(Compression.Algorithm compression) {
800       this.compression = compression;
801     }
802 
setBloomType(BloomType bloomType)803     public void setBloomType(BloomType bloomType) {
804       this.bloomType = bloomType;
805     }
806 
setBlockEncoding(DataBlockEncoding blockEncoding)807     public void setBlockEncoding(DataBlockEncoding blockEncoding) {
808       this.blockEncoding = blockEncoding;
809     }
810 
setValueRandom(boolean valueRandom)811     public void setValueRandom(boolean valueRandom) {
812       this.valueRandom = valueRandom;
813     }
814 
setValueSize(int valueSize)815     public void setValueSize(int valueSize) {
816       this.valueSize = valueSize;
817     }
818 
setPeriod(int period)819     public void setPeriod(int period) {
820       this.period = period;
821     }
822 
isNomapred()823     public boolean isNomapred() {
824       return nomapred;
825     }
826 
isFilterAll()827     public boolean isFilterAll() {
828       return filterAll;
829     }
830 
getStartRow()831     public int getStartRow() {
832       return startRow;
833     }
834 
getSize()835     public float getSize() {
836       return size;
837     }
838 
getPerClientRunRows()839     public int getPerClientRunRows() {
840       return perClientRunRows;
841     }
842 
getNumClientThreads()843     public int getNumClientThreads() {
844       return numClientThreads;
845     }
846 
getTotalRows()847     public int getTotalRows() {
848       return totalRows;
849     }
850 
getSampleRate()851     public float getSampleRate() {
852       return sampleRate;
853     }
854 
getTraceRate()855     public double getTraceRate() {
856       return traceRate;
857     }
858 
getTableName()859     public String getTableName() {
860       return tableName;
861     }
862 
isFlushCommits()863     public boolean isFlushCommits() {
864       return flushCommits;
865     }
866 
isWriteToWAL()867     public boolean isWriteToWAL() {
868       return writeToWAL;
869     }
870 
isAutoFlush()871     public boolean isAutoFlush() {
872       return autoFlush;
873     }
874 
isUseTags()875     public boolean isUseTags() {
876       return useTags;
877     }
878 
getNoOfTags()879     public int getNoOfTags() {
880       return noOfTags;
881     }
882 
isReportLatency()883     public boolean isReportLatency() {
884       return reportLatency;
885     }
886 
getMultiGet()887     public int getMultiGet() {
888       return multiGet;
889     }
890 
isInMemoryCF()891     public boolean isInMemoryCF() {
892       return inMemoryCF;
893     }
894 
getPresplitRegions()895     public int getPresplitRegions() {
896       return presplitRegions;
897     }
898 
getCompression()899     public Compression.Algorithm getCompression() {
900       return compression;
901     }
902 
getBlockEncoding()903     public DataBlockEncoding getBlockEncoding() {
904       return blockEncoding;
905     }
906 
isValueRandom()907     public boolean isValueRandom() {
908       return valueRandom;
909     }
910 
getValueSize()911     public int getValueSize() {
912       return valueSize;
913     }
914 
getPeriod()915     public int getPeriod() {
916       return period;
917     }
918 
getBloomType()919     public BloomType getBloomType() {
920       return bloomType;
921     }
922 
isOneCon()923     public boolean isOneCon() {
924       return oneCon;
925     }
926 
getAddColumns()927     public boolean getAddColumns() {
928       return addColumns;
929     }
930 
setAddColumns(boolean addColumns)931     public void setAddColumns(boolean addColumns) {
932       this.addColumns = addColumns;
933     }
934   }
935 
936   /*
937    * A test.
938    * Subclass to particularize what happens per row.
939    */
940   static abstract class Test {
941     // Below is make it so when Tests are all running in the one
942     // jvm, that they each have a differently seeded Random.
943     private static final Random randomSeed = new Random(System.currentTimeMillis());
944 
nextRandomSeed()945     private static long nextRandomSeed() {
946       return randomSeed.nextLong();
947     }
948     private final int everyN;
949 
950     protected final Random rand = new Random(nextRandomSeed());
951     protected final Configuration conf;
952     protected final TestOptions opts;
953 
954     private final Status status;
955     private final Sampler<?> traceSampler;
956     private final SpanReceiverHost receiverHost;
957     protected Connection connection;
958 
959     private String testName;
960     private Histogram latency;
961     private Histogram valueSize;
962     private RandomDistribution.Zipf zipf;
963 
964     /**
965      * Note that all subclasses of this class must provide a public constructor
966      * that has the exact same list of arguments.
967      */
Test(final Connection con, final TestOptions options, final Status status)968     Test(final Connection con, final TestOptions options, final Status status) {
969       this.connection = con;
970       this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration();
971       this.opts = options;
972       this.status = status;
973       this.testName = this.getClass().getSimpleName();
974       receiverHost = SpanReceiverHost.getInstance(conf);
975       if (options.traceRate >= 1.0) {
976         this.traceSampler = Sampler.ALWAYS;
977       } else if (options.traceRate > 0.0) {
978         conf.setDouble("hbase.sampler.fraction", options.traceRate);
979         this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
980       } else {
981         this.traceSampler = Sampler.NEVER;
982       }
983       everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
984       if (options.isValueZipf()) {
985         this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1);
986       }
987       LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
988     }
989 
getValueLength(final Random r)990     int getValueLength(final Random r) {
991       if (this.opts.isValueRandom()) return Math.abs(r.nextInt() % opts.valueSize);
992       else if (this.opts.isValueZipf()) return Math.abs(this.zipf.nextInt());
993       else return opts.valueSize;
994     }
995 
updateValueSize(final Result [] rs)996     void updateValueSize(final Result [] rs) throws IOException {
997       if (rs == null || !isRandomValueSize()) return;
998       for (Result r: rs) updateValueSize(r);
999     }
1000 
updateValueSize(final Result r)1001     void updateValueSize(final Result r) throws IOException {
1002       if (r == null || !isRandomValueSize()) return;
1003       int size = 0;
1004       for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1005         size += scanner.current().getValueLength();
1006       }
1007       updateValueSize(size);
1008     }
1009 
updateValueSize(final int valueSize)1010     void updateValueSize(final int valueSize) {
1011       if (!isRandomValueSize()) return;
1012       this.valueSize.update(valueSize);
1013     }
1014 
generateStatus(final int sr, final int i, final int lr)1015     String generateStatus(final int sr, final int i, final int lr) {
1016       return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1017         (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1018     }
1019 
isRandomValueSize()1020     boolean isRandomValueSize() {
1021       return opts.valueRandom;
1022     }
1023 
getReportingPeriod()1024     protected int getReportingPeriod() {
1025       return opts.period;
1026     }
1027 
1028     /**
1029      * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1030      */
getLatency()1031     public Histogram getLatency() {
1032       return latency;
1033     }
1034 
testSetup()1035     void testSetup() throws IOException {
1036       if (!opts.oneCon) {
1037         this.connection = ConnectionFactory.createConnection(conf);
1038       }
1039       onStartup();
1040       latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
1041       valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
1042     }
1043 
onStartup()1044     abstract void onStartup() throws IOException;
1045 
testTakedown()1046     void testTakedown() throws IOException {
1047       reportLatency();
1048       reportValueSize();
1049       onTakedown();
1050       if (!opts.oneCon) {
1051         connection.close();
1052       }
1053       receiverHost.closeReceivers();
1054     }
1055 
onTakedown()1056     abstract void onTakedown() throws IOException;
1057 
1058     /*
1059      * Run test
1060      * @return Elapsed time.
1061      * @throws IOException
1062      */
test()1063     long test() throws IOException, InterruptedException {
1064       testSetup();
1065       LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1066       final long startTime = System.nanoTime();
1067       try {
1068         testTimed();
1069       } finally {
1070         testTakedown();
1071       }
1072       return (System.nanoTime() - startTime) / 1000000;
1073     }
1074 
getStartRow()1075     int getStartRow() {
1076       return opts.startRow;
1077     }
1078 
getLastRow()1079     int getLastRow() {
1080       return getStartRow() + opts.perClientRunRows;
1081     }
1082 
1083     /**
1084      * Provides an extension point for tests that don't want a per row invocation.
1085      */
testTimed()1086     void testTimed() throws IOException, InterruptedException {
1087       int startRow = getStartRow();
1088       int lastRow = getLastRow();
1089       // Report on completion of 1/10th of total.
1090       for (int i = startRow; i < lastRow; i++) {
1091         if (i % everyN != 0) continue;
1092         long startTime = System.nanoTime();
1093         TraceScope scope = Trace.startSpan("test row", traceSampler);
1094         try {
1095           testRow(i);
1096         } finally {
1097           scope.close();
1098         }
1099         latency.update((System.nanoTime() - startTime) / 1000);
1100         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1101           status.setStatus(generateStatus(startRow, i, lastRow));
1102         }
1103       }
1104     }
1105 
1106     /**
1107      * report percentiles of latency
1108      * @throws IOException
1109      */
reportLatency()1110     private void reportLatency() throws IOException {
1111       status.setStatus(testName + " latency log (microseconds), on " +
1112           latency.count() + " measures");
1113       reportHistogram(this.latency);
1114     }
1115 
reportValueSize()1116     private void reportValueSize() throws IOException {
1117       status.setStatus(testName + " valueSize after " +
1118           valueSize.count() + " measures");
1119       reportHistogram(this.valueSize);
1120     }
1121 
reportHistogram(final Histogram h)1122     private void reportHistogram(final Histogram h) throws IOException {
1123       Snapshot sn = h.getSnapshot();
1124       status.setStatus(testName + " Min      = " + h.min());
1125       status.setStatus(testName + " Avg      = " + h.mean());
1126       status.setStatus(testName + " StdDev   = " + h.stdDev());
1127       status.setStatus(testName + " 50th     = " + sn.getMedian());
1128       status.setStatus(testName + " 75th     = " + sn.get75thPercentile());
1129       status.setStatus(testName + " 95th     = " + sn.get95thPercentile());
1130       status.setStatus(testName + " 99th     = " + sn.get99thPercentile());
1131       status.setStatus(testName + " 99.9th   = " + sn.get999thPercentile());
1132       status.setStatus(testName + " 99.99th  = " + sn.getValue(0.9999));
1133       status.setStatus(testName + " 99.999th = " + sn.getValue(0.99999));
1134       status.setStatus(testName + " Max      = " + h.max());
1135     }
1136 
1137     /**
1138      * @return Subset of the histograms' calculation.
1139      */
getShortLatencyReport()1140     public String getShortLatencyReport() {
1141       return YammerHistogramUtils.getShortHistogramReport(this.latency);
1142     }
1143 
1144     /**
1145      * @return Subset of the histograms' calculation.
1146      */
getShortValueSizeReport()1147     public String getShortValueSizeReport() {
1148       return YammerHistogramUtils.getShortHistogramReport(this.valueSize);
1149     }
1150 
1151     /*
1152     * Test for individual row.
1153     * @param i Row index.
1154     */
testRow(final int i)1155     abstract void testRow(final int i) throws IOException, InterruptedException;
1156   }
1157 
1158   static abstract class TableTest extends Test {
1159     protected Table table;
1160 
TableTest(Connection con, TestOptions options, Status status)1161     TableTest(Connection con, TestOptions options, Status status) {
1162       super(con, options, status);
1163     }
1164 
1165     @Override
onStartup()1166     void onStartup() throws IOException {
1167       this.table = connection.getTable(TableName.valueOf(opts.tableName));
1168     }
1169 
1170     @Override
onTakedown()1171     void onTakedown() throws IOException {
1172       table.close();
1173     }
1174   }
1175 
1176   static abstract class BufferedMutatorTest extends Test {
1177     protected BufferedMutator mutator;
1178 
BufferedMutatorTest(Connection con, TestOptions options, Status status)1179     BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1180       super(con, options, status);
1181     }
1182 
1183     @Override
onStartup()1184     void onStartup() throws IOException {
1185       this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
1186     }
1187 
1188     @Override
onTakedown()1189     void onTakedown() throws IOException {
1190       mutator.close();
1191     }
1192   }
1193 
1194   static class RandomSeekScanTest extends TableTest {
RandomSeekScanTest(Connection con, TestOptions options, Status status)1195     RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1196       super(con, options, status);
1197     }
1198 
1199     @Override
testRow(final int i)1200     void testRow(final int i) throws IOException {
1201       Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
1202       scan.setCaching(opts.caching);
1203       FilterList list = new FilterList();
1204       if (opts.addColumns) {
1205         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1206       } else {
1207         scan.addFamily(FAMILY_NAME);
1208       }
1209       if (opts.filterAll) {
1210         list.addFilter(new FilterAllFilter());
1211       }
1212       list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1213       scan.setFilter(list);
1214       ResultScanner s = this.table.getScanner(scan);
1215       for (Result rr; (rr = s.next()) != null;) {
1216         updateValueSize(rr);
1217       }
1218       s.close();
1219     }
1220 
1221     @Override
getReportingPeriod()1222     protected int getReportingPeriod() {
1223       int period = opts.perClientRunRows / 100;
1224       return period == 0 ? opts.perClientRunRows : period;
1225     }
1226 
1227   }
1228 
1229   static abstract class RandomScanWithRangeTest extends TableTest {
RandomScanWithRangeTest(Connection con, TestOptions options, Status status)1230     RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1231       super(con, options, status);
1232     }
1233 
1234     @Override
testRow(final int i)1235     void testRow(final int i) throws IOException {
1236       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1237       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
1238       scan.setCaching(opts.caching);
1239       if (opts.filterAll) {
1240         scan.setFilter(new FilterAllFilter());
1241       }
1242       if (opts.addColumns) {
1243         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1244       } else {
1245         scan.addFamily(FAMILY_NAME);
1246       }
1247       Result r = null;
1248       int count = 0;
1249       ResultScanner s = this.table.getScanner(scan);
1250       for (; (r = s.next()) != null;) {
1251         updateValueSize(r);
1252         count++;
1253       }
1254       if (i % 100 == 0) {
1255         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1256             Bytes.toString(startAndStopRow.getFirst()),
1257             Bytes.toString(startAndStopRow.getSecond()), count));
1258       }
1259 
1260       s.close();
1261     }
1262 
getStartAndStopRow()1263     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1264 
generateStartAndStopRows(int maxRange)1265     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1266       int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1267       int stop = start + maxRange;
1268       return new Pair<byte[],byte[]>(format(start), format(stop));
1269     }
1270 
1271     @Override
getReportingPeriod()1272     protected int getReportingPeriod() {
1273       int period = opts.perClientRunRows / 100;
1274       return period == 0? opts.perClientRunRows: period;
1275     }
1276   }
1277 
1278   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
RandomScanWithRange10Test(Connection con, TestOptions options, Status status)1279     RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1280       super(con, options, status);
1281     }
1282 
1283     @Override
getStartAndStopRow()1284     protected Pair<byte[], byte[]> getStartAndStopRow() {
1285       return generateStartAndStopRows(10);
1286     }
1287   }
1288 
1289   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
RandomScanWithRange100Test(Connection con, TestOptions options, Status status)1290     RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1291       super(con, options, status);
1292     }
1293 
1294     @Override
getStartAndStopRow()1295     protected Pair<byte[], byte[]> getStartAndStopRow() {
1296       return generateStartAndStopRows(100);
1297     }
1298   }
1299 
1300   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
RandomScanWithRange1000Test(Connection con, TestOptions options, Status status)1301     RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1302       super(con, options, status);
1303     }
1304 
1305     @Override
getStartAndStopRow()1306     protected Pair<byte[], byte[]> getStartAndStopRow() {
1307       return generateStartAndStopRows(1000);
1308     }
1309   }
1310 
1311   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
RandomScanWithRange10000Test(Connection con, TestOptions options, Status status)1312     RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1313       super(con, options, status);
1314     }
1315 
1316     @Override
getStartAndStopRow()1317     protected Pair<byte[], byte[]> getStartAndStopRow() {
1318       return generateStartAndStopRows(10000);
1319     }
1320   }
1321 
1322   static class RandomReadTest extends TableTest {
1323     private final Consistency consistency;
1324     private ArrayList<Get> gets;
1325     private Random rd = new Random();
1326 
RandomReadTest(Connection con, TestOptions options, Status status)1327     RandomReadTest(Connection con, TestOptions options, Status status) {
1328       super(con, options, status);
1329       consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1330       if (opts.multiGet > 0) {
1331         LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1332         this.gets = new ArrayList<Get>(opts.multiGet);
1333       }
1334     }
1335 
1336     @Override
testRow(final int i)1337     void testRow(final int i) throws IOException, InterruptedException {
1338       if (opts.randomSleep > 0) {
1339         Thread.sleep(rd.nextInt(opts.randomSleep));
1340       }
1341       Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1342       if (opts.addColumns) {
1343         get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1344       } else {
1345         get.addFamily(FAMILY_NAME);
1346       }
1347       if (opts.filterAll) {
1348         get.setFilter(new FilterAllFilter());
1349       }
1350       get.setConsistency(consistency);
1351       if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1352       if (opts.multiGet > 0) {
1353         this.gets.add(get);
1354         if (this.gets.size() == opts.multiGet) {
1355           Result [] rs = this.table.get(this.gets);
1356           updateValueSize(rs);
1357           this.gets.clear();
1358         }
1359       } else {
1360         updateValueSize(this.table.get(get));
1361       }
1362     }
1363 
1364     @Override
getReportingPeriod()1365     protected int getReportingPeriod() {
1366       int period = opts.perClientRunRows / 10;
1367       return period == 0 ? opts.perClientRunRows : period;
1368     }
1369 
1370     @Override
testTakedown()1371     protected void testTakedown() throws IOException {
1372       if (this.gets != null && this.gets.size() > 0) {
1373         this.table.get(gets);
1374         this.gets.clear();
1375       }
1376       super.testTakedown();
1377     }
1378   }
1379 
1380   static class RandomWriteTest extends BufferedMutatorTest {
RandomWriteTest(Connection con, TestOptions options, Status status)1381     RandomWriteTest(Connection con, TestOptions options, Status status) {
1382       super(con, options, status);
1383     }
1384 
1385     @Override
testRow(final int i)1386     void testRow(final int i) throws IOException {
1387       byte[] row = getRandomRow(this.rand, opts.totalRows);
1388       Put put = new Put(row);
1389       for (int column = 0; column < opts.columns; column++) {
1390         byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1391         byte[] value = generateData(this.rand, getValueLength(this.rand));
1392         if (opts.useTags) {
1393           byte[] tag = generateData(this.rand, TAG_LENGTH);
1394           Tag[] tags = new Tag[opts.noOfTags];
1395           for (int n = 0; n < opts.noOfTags; n++) {
1396             Tag t = new Tag((byte) n, tag);
1397             tags[n] = t;
1398           }
1399           KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
1400               value, tags);
1401           put.add(kv);
1402           updateValueSize(kv.getValueLength());
1403         } else {
1404           put.add(FAMILY_NAME, qualifier, value);
1405           updateValueSize(value.length);
1406         }
1407       }
1408       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1409       mutator.mutate(put);
1410     }
1411   }
1412 
1413   static class ScanTest extends TableTest {
1414     private ResultScanner testScanner;
1415 
ScanTest(Connection con, TestOptions options, Status status)1416     ScanTest(Connection con, TestOptions options, Status status) {
1417       super(con, options, status);
1418     }
1419 
1420     @Override
testTakedown()1421     void testTakedown() throws IOException {
1422       if (this.testScanner != null) {
1423         this.testScanner.close();
1424       }
1425       super.testTakedown();
1426     }
1427 
1428 
1429     @Override
testRow(final int i)1430     void testRow(final int i) throws IOException {
1431       if (this.testScanner == null) {
1432         Scan scan = new Scan(format(opts.startRow));
1433         scan.setCaching(opts.caching);
1434         if (opts.addColumns) {
1435           scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1436         } else {
1437           scan.addFamily(FAMILY_NAME);
1438         }
1439         if (opts.filterAll) {
1440           scan.setFilter(new FilterAllFilter());
1441         }
1442        this.testScanner = table.getScanner(scan);
1443       }
1444       Result r = testScanner.next();
1445       updateValueSize(r);
1446     }
1447   }
1448 
1449   /**
1450    * Base class for operations that are CAS-like; that read a value and then set it based off what
1451    * they read. In this category is increment, append, checkAndPut, etc.
1452    *
1453    * <p>These operations also want some concurrency going on. Usually when these tests run, they
1454    * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1455    * same key space. We do this with our getStartRow and getLastRow overrides.
1456    */
1457   static abstract class CASTableTest extends TableTest {
1458     private final byte [] qualifier;
CASTableTest(Connection con, TestOptions options, Status status)1459     CASTableTest(Connection con, TestOptions options, Status status) {
1460       super(con, options, status);
1461       qualifier = Bytes.toBytes(this.getClass().getSimpleName());
1462     }
1463 
getQualifier()1464     byte [] getQualifier() {
1465       return this.qualifier;
1466     }
1467 
1468     @Override
getStartRow()1469     int getStartRow() {
1470       return 0;
1471     }
1472 
1473     @Override
getLastRow()1474     int getLastRow() {
1475       return opts.perClientRunRows;
1476     }
1477   }
1478 
1479   static class IncrementTest extends CASTableTest {
IncrementTest(Connection con, TestOptions options, Status status)1480     IncrementTest(Connection con, TestOptions options, Status status) {
1481       super(con, options, status);
1482     }
1483 
1484     @Override
testRow(final int i)1485     void testRow(final int i) throws IOException {
1486       Increment increment = new Increment(format(i));
1487       increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
1488       updateValueSize(this.table.increment(increment));
1489     }
1490   }
1491 
1492   static class AppendTest extends CASTableTest {
AppendTest(Connection con, TestOptions options, Status status)1493     AppendTest(Connection con, TestOptions options, Status status) {
1494       super(con, options, status);
1495     }
1496 
1497     @Override
testRow(final int i)1498     void testRow(final int i) throws IOException {
1499       byte [] bytes = format(i);
1500       Append append = new Append(bytes);
1501       append.add(FAMILY_NAME, getQualifier(), bytes);
1502       updateValueSize(this.table.append(append));
1503     }
1504   }
1505 
1506   static class CheckAndMutateTest extends CASTableTest {
CheckAndMutateTest(Connection con, TestOptions options, Status status)1507     CheckAndMutateTest(Connection con, TestOptions options, Status status) {
1508       super(con, options, status);
1509     }
1510 
1511     @Override
testRow(final int i)1512     void testRow(final int i) throws IOException {
1513       byte [] bytes = format(i);
1514       // Put a known value so when we go to check it, it is there.
1515       Put put = new Put(bytes);
1516       put.addColumn(FAMILY_NAME, getQualifier(), bytes);
1517       this.table.put(put);
1518       RowMutations mutations = new RowMutations(bytes);
1519       mutations.add(put);
1520       this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes,
1521           mutations);
1522     }
1523   }
1524 
1525   static class CheckAndPutTest extends CASTableTest {
CheckAndPutTest(Connection con, TestOptions options, Status status)1526     CheckAndPutTest(Connection con, TestOptions options, Status status) {
1527       super(con, options, status);
1528     }
1529 
1530     @Override
testRow(final int i)1531     void testRow(final int i) throws IOException {
1532       byte [] bytes = format(i);
1533       // Put a known value so when we go to check it, it is there.
1534       Put put = new Put(bytes);
1535       put.addColumn(FAMILY_NAME, getQualifier(), bytes);
1536       this.table.put(put);
1537       this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put);
1538     }
1539   }
1540 
1541   static class CheckAndDeleteTest extends CASTableTest {
CheckAndDeleteTest(Connection con, TestOptions options, Status status)1542     CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
1543       super(con, options, status);
1544     }
1545 
1546     @Override
testRow(final int i)1547     void testRow(final int i) throws IOException {
1548       byte [] bytes = format(i);
1549       // Put a known value so when we go to check it, it is there.
1550       Put put = new Put(bytes);
1551       put.addColumn(FAMILY_NAME, getQualifier(), bytes);
1552       this.table.put(put);
1553       Delete delete = new Delete(put.getRow());
1554       delete.addColumn(FAMILY_NAME, getQualifier());
1555       this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete);
1556     }
1557   }
1558 
1559   static class SequentialReadTest extends TableTest {
SequentialReadTest(Connection con, TestOptions options, Status status)1560     SequentialReadTest(Connection con, TestOptions options, Status status) {
1561       super(con, options, status);
1562     }
1563 
1564     @Override
testRow(final int i)1565     void testRow(final int i) throws IOException {
1566       Get get = new Get(format(i));
1567       if (opts.addColumns) {
1568         get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1569       }
1570       if (opts.filterAll) {
1571         get.setFilter(new FilterAllFilter());
1572       }
1573       updateValueSize(table.get(get));
1574     }
1575   }
1576 
1577   static class SequentialWriteTest extends BufferedMutatorTest {
SequentialWriteTest(Connection con, TestOptions options, Status status)1578     SequentialWriteTest(Connection con, TestOptions options, Status status) {
1579       super(con, options, status);
1580     }
1581 
1582     @Override
testRow(final int i)1583     void testRow(final int i) throws IOException {
1584       byte[] row = format(i);
1585       Put put = new Put(row);
1586       for (int column = 0; column < opts.columns; column++) {
1587         byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1588         byte[] value = generateData(this.rand, getValueLength(this.rand));
1589         if (opts.useTags) {
1590           byte[] tag = generateData(this.rand, TAG_LENGTH);
1591           Tag[] tags = new Tag[opts.noOfTags];
1592           for (int n = 0; n < opts.noOfTags; n++) {
1593             Tag t = new Tag((byte) n, tag);
1594             tags[n] = t;
1595           }
1596           KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
1597               value, tags);
1598           put.add(kv);
1599           updateValueSize(kv.getValueLength());
1600         } else {
1601           put.add(FAMILY_NAME, qualifier, value);
1602           updateValueSize(value.length);
1603         }
1604       }
1605       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1606       mutator.mutate(put);
1607     }
1608   }
1609 
1610   static class FilteredScanTest extends TableTest {
1611     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1612 
FilteredScanTest(Connection con, TestOptions options, Status status)1613     FilteredScanTest(Connection con, TestOptions options, Status status) {
1614       super(con, options, status);
1615     }
1616 
1617     @Override
testRow(int i)1618     void testRow(int i) throws IOException {
1619       byte[] value = generateData(this.rand, getValueLength(this.rand));
1620       Scan scan = constructScan(value);
1621       ResultScanner scanner = null;
1622       try {
1623         scanner = this.table.getScanner(scan);
1624         for (Result r = null; (r = scanner.next()) != null;) {
1625           updateValueSize(r);
1626         }
1627       } finally {
1628         if (scanner != null) scanner.close();
1629       }
1630     }
1631 
constructScan(byte[] valuePrefix)1632     protected Scan constructScan(byte[] valuePrefix) throws IOException {
1633       FilterList list = new FilterList();
1634       Filter filter = new SingleColumnValueFilter(
1635           FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
1636           new BinaryComparator(valuePrefix)
1637       );
1638       list.addFilter(filter);
1639       if(opts.filterAll) {
1640         list.addFilter(new FilterAllFilter());
1641       }
1642       Scan scan = new Scan();
1643       scan.setCaching(opts.caching);
1644       if (opts.addColumns) {
1645         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1646       } else {
1647         scan.addFamily(FAMILY_NAME);
1648       }
1649       scan.setFilter(list);
1650       return scan;
1651     }
1652   }
1653 
1654   /**
1655    * Compute a throughput rate in MB/s.
1656    * @param rows Number of records consumed.
1657    * @param timeMs Time taken in milliseconds.
1658    * @return String value with label, ie '123.76 MB/s'
1659    */
calculateMbps(int rows, long timeMs, final int valueSize, int columns)1660   private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
1661     BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
1662       ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
1663     BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
1664       .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
1665       .divide(BYTES_PER_MB, CXT);
1666     return FMT.format(mbps) + " MB/s";
1667   }
1668 
1669   /*
1670    * Format passed integer.
1671    * @param number
1672    * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
1673    * number (Does absolute in case number is negative).
1674    */
format(final int number)1675   public static byte [] format(final int number) {
1676     byte [] b = new byte[ROW_LENGTH];
1677     int d = Math.abs(number);
1678     for (int i = b.length - 1; i >= 0; i--) {
1679       b[i] = (byte)((d % 10) + '0');
1680       d /= 10;
1681     }
1682     return b;
1683   }
1684 
1685   /*
1686    * This method takes some time and is done inline uploading data.  For
1687    * example, doing the mapfile test, generation of the key and value
1688    * consumes about 30% of CPU time.
1689    * @return Generated random value to insert into a table cell.
1690    */
generateData(final Random r, int length)1691   public static byte[] generateData(final Random r, int length) {
1692     byte [] b = new byte [length];
1693     int i;
1694 
1695     for(i = 0; i < (length-8); i += 8) {
1696       b[i] = (byte) (65 + r.nextInt(26));
1697       b[i+1] = b[i];
1698       b[i+2] = b[i];
1699       b[i+3] = b[i];
1700       b[i+4] = b[i];
1701       b[i+5] = b[i];
1702       b[i+6] = b[i];
1703       b[i+7] = b[i];
1704     }
1705 
1706     byte a = (byte) (65 + r.nextInt(26));
1707     for(; i < length; i++) {
1708       b[i] = a;
1709     }
1710     return b;
1711   }
1712 
1713   /**
1714    * @deprecated Use {@link #generateData(java.util.Random, int)} instead.
1715    * @return Generated random value to insert into a table cell.
1716    */
1717   @Deprecated
generateValue(final Random r)1718   public static byte[] generateValue(final Random r) {
1719     return generateData(r, DEFAULT_VALUE_LENGTH);
1720   }
1721 
getRandomRow(final Random random, final int totalRows)1722   static byte [] getRandomRow(final Random random, final int totalRows) {
1723     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1724   }
1725 
runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con, TestOptions opts, final Status status)1726   static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con,
1727                            TestOptions opts, final Status status)
1728       throws IOException, InterruptedException {
1729     status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
1730       opts.perClientRunRows + " rows");
1731     long totalElapsedTime;
1732 
1733     final Test t;
1734     try {
1735       Constructor<? extends Test> constructor =
1736         cmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
1737       t = constructor.newInstance(con, opts, status);
1738     } catch (NoSuchMethodException e) {
1739       throw new IllegalArgumentException("Invalid command class: " +
1740           cmd.getName() + ".  It does not provide a constructor as described by " +
1741           "the javadoc comment.  Available constructors are: " +
1742           Arrays.toString(cmd.getConstructors()));
1743     } catch (Exception e) {
1744       throw new IllegalStateException("Failed to construct command class", e);
1745     }
1746     totalElapsedTime = t.test();
1747 
1748     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1749       "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
1750       " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
1751           getAverageValueLength(opts), opts.columns) + ")");
1752 
1753     return new RunResult(totalElapsedTime, t.getLatency());
1754   }
1755 
getAverageValueLength(final TestOptions opts)1756   private static int getAverageValueLength(final TestOptions opts) {
1757     return opts.valueRandom? opts.valueSize/2: opts.valueSize;
1758   }
1759 
runTest(final Class<? extends Test> cmd, TestOptions opts)1760   private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
1761       InterruptedException, ClassNotFoundException {
1762     // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
1763     // the TestOptions introspection for us and dump the output in a readable format.
1764     LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
1765     try(Connection conn = ConnectionFactory.createConnection(getConf());
1766         Admin admin = conn.getAdmin()) {
1767       checkTable(admin, opts);
1768     }
1769     if (opts.nomapred) {
1770       doLocalClients(opts, getConf());
1771     } else {
1772       doMapReduce(opts, getConf());
1773     }
1774   }
1775 
printUsage()1776   protected void printUsage() {
1777     printUsage(this.getClass().getName(), null);
1778   }
1779 
printUsage(final String message)1780   protected static void printUsage(final String message) {
1781     printUsage(PerformanceEvaluation.class.getName(), message);
1782   }
1783 
printUsageAndExit(final String message, final int exitCode)1784   protected static void printUsageAndExit(final String message, final int exitCode) {
1785     printUsage(message);
1786     System.exit(exitCode);
1787   }
1788 
printUsage(final String className, final String message)1789   protected static void printUsage(final String className, final String message) {
1790     if (message != null && message.length() > 0) {
1791       System.err.println(message);
1792     }
1793     System.err.println("Usage: java " + className + " \\");
1794     System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
1795     System.err.println();
1796     System.err.println("Options:");
1797     System.err.println(" nomapred        Run multiple clients using threads " +
1798       "(rather than use mapreduce)");
1799     System.err.println(" rows            Rows each client runs. Default: One million");
1800     System.err.println(" size            Total size in GiB. Mutually exclusive with --rows. " +
1801       "Default: 1.0.");
1802     System.err.println(" sampleRate      Execute test on a sample of total " +
1803       "rows. Only supported by randomRead. Default: 1.0");
1804     System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
1805       "Default: 0");
1806     System.err.println(" table           Alternate table name. Default: 'TestTable'");
1807     System.err.println(" multiGet        If >0, when doing RandomRead, perform multiple gets " +
1808       "instead of single gets. Default: 0");
1809     System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1810     System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
1811       "Default: false");
1812     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
1813     System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
1814     System.err.println(" oneCon          all the threads share the same connection. Default: False");
1815     System.err.println(" presplit        Create presplit table. Recommended for accurate perf " +
1816       "analysis (see guide).  Default: disabled");
1817     System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
1818       "inmemory as far as possible. Not guaranteed that reads are always served " +
1819       "from memory.  Default: false");
1820     System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
1821       "Default: false");
1822     System.err.println(" numoftags       Specify the no of tags that would be needed. " +
1823        "This works only if usetags is true.");
1824     System.err.println(" filterAll       Helps to filter out all the rows on the server side"
1825         + " there by not returning any thing back to the client.  Helps to check the server side"
1826         + " performance.  Uses FilterAllFilter internally. ");
1827     System.err.println(" latency         Set to report operation latencies. Default: False");
1828     System.err.println(" bloomFilter      Bloom filter type, one of " + Arrays.toString(BloomType.values()));
1829     System.err.println(" valueSize       Pass value size to use: Default: 1024");
1830     System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
1831         "'valueSize'; set on read for stats on size: Default: Not set.");
1832     System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
1833         "'valueSize' in zipf form: Default: Not set.");
1834     System.err.println(" period          Report every 'period' rows: " +
1835       "Default: opts.perClientRunRows / 10");
1836     System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
1837       "by randomRead. Default: disabled");
1838     System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
1839     System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
1840     System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
1841     System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
1842     System.err.println(" columns         Columns to write per row. Default: 1");
1843     System.err.println(" caching         Scan caching to use. Default: 30");
1844     System.err.println();
1845     System.err.println(" Note: -D properties will be applied to the conf used. ");
1846     System.err.println("  For example: ");
1847     System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
1848     System.err.println("   -Dmapreduce.task.timeout=60000");
1849     System.err.println();
1850     System.err.println("Command:");
1851     for (CmdDescriptor command : COMMANDS.values()) {
1852       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1853     }
1854     System.err.println();
1855     System.err.println("Args:");
1856     System.err.println(" nclients        Integer. Required. Total number of " +
1857       "clients (and HRegionServers)");
1858     System.err.println("                 running: 1 <= value <= 500");
1859     System.err.println("Examples:");
1860     System.err.println(" To run a single client doing the default 1M sequentialWrites:");
1861     System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
1862     System.err.println(" To run 10 clients doing increments over ten rows:");
1863     System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10");
1864   }
1865 
1866   /**
1867    * Parse options passed in via an arguments array. Assumes that array has been split
1868    * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
1869    * in the queue at the conclusion of this method call. It's up to the caller to deal
1870    * with these unrecognized arguments.
1871    */
parseOpts(Queue<String> args)1872   static TestOptions parseOpts(Queue<String> args) {
1873     TestOptions opts = new TestOptions();
1874 
1875     String cmd = null;
1876     while ((cmd = args.poll()) != null) {
1877       if (cmd.equals("-h") || cmd.startsWith("--h")) {
1878         // place item back onto queue so that caller knows parsing was incomplete
1879         args.add(cmd);
1880         break;
1881       }
1882 
1883       final String nmr = "--nomapred";
1884       if (cmd.startsWith(nmr)) {
1885         opts.nomapred = true;
1886         continue;
1887       }
1888 
1889       final String rows = "--rows=";
1890       if (cmd.startsWith(rows)) {
1891         opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
1892         continue;
1893       }
1894 
1895       final String sampleRate = "--sampleRate=";
1896       if (cmd.startsWith(sampleRate)) {
1897         opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
1898         continue;
1899       }
1900 
1901       final String table = "--table=";
1902       if (cmd.startsWith(table)) {
1903         opts.tableName = cmd.substring(table.length());
1904         continue;
1905       }
1906 
1907       final String startRow = "--startRow=";
1908       if (cmd.startsWith(startRow)) {
1909         opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
1910         continue;
1911       }
1912 
1913       final String compress = "--compress=";
1914       if (cmd.startsWith(compress)) {
1915         opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1916         continue;
1917       }
1918 
1919       final String traceRate = "--traceRate=";
1920       if (cmd.startsWith(traceRate)) {
1921         opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
1922         continue;
1923       }
1924 
1925       final String blockEncoding = "--blockEncoding=";
1926       if (cmd.startsWith(blockEncoding)) {
1927         opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1928         continue;
1929       }
1930 
1931       final String flushCommits = "--flushCommits=";
1932       if (cmd.startsWith(flushCommits)) {
1933         opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1934         continue;
1935       }
1936 
1937       final String writeToWAL = "--writeToWAL=";
1938       if (cmd.startsWith(writeToWAL)) {
1939         opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1940         continue;
1941       }
1942 
1943       final String presplit = "--presplit=";
1944       if (cmd.startsWith(presplit)) {
1945         opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1946         continue;
1947       }
1948 
1949       final String inMemory = "--inmemory=";
1950       if (cmd.startsWith(inMemory)) {
1951         opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1952         continue;
1953       }
1954 
1955       final String autoFlush = "--autoFlush=";
1956       if (cmd.startsWith(autoFlush)) {
1957         opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
1958         continue;
1959       }
1960 
1961       final String onceCon = "--oneCon=";
1962       if (cmd.startsWith(onceCon)) {
1963         opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
1964         continue;
1965       }
1966 
1967       final String latency = "--latency";
1968       if (cmd.startsWith(latency)) {
1969         opts.reportLatency = true;
1970         continue;
1971       }
1972 
1973       final String multiGet = "--multiGet=";
1974       if (cmd.startsWith(multiGet)) {
1975         opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
1976         continue;
1977       }
1978 
1979       final String useTags = "--usetags=";
1980       if (cmd.startsWith(useTags)) {
1981         opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1982         continue;
1983       }
1984 
1985       final String noOfTags = "--numoftags=";
1986       if (cmd.startsWith(noOfTags)) {
1987         opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1988         continue;
1989       }
1990 
1991       final String replicas = "--replicas=";
1992       if (cmd.startsWith(replicas)) {
1993         opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
1994         continue;
1995       }
1996 
1997       final String filterOutAll = "--filterAll";
1998       if (cmd.startsWith(filterOutAll)) {
1999         opts.filterAll = true;
2000         continue;
2001       }
2002 
2003       final String size = "--size=";
2004       if (cmd.startsWith(size)) {
2005         opts.size = Float.parseFloat(cmd.substring(size.length()));
2006         continue;
2007       }
2008 
2009       final String splitPolicy = "--splitPolicy=";
2010       if (cmd.startsWith(splitPolicy)) {
2011         opts.splitPolicy = cmd.substring(splitPolicy.length());
2012         continue;
2013       }
2014 
2015       final String randomSleep = "--randomSleep=";
2016       if (cmd.startsWith(randomSleep)) {
2017         opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2018         continue;
2019       }
2020 
2021       final String bloomFilter = "--bloomFilter=";
2022       if (cmd.startsWith(bloomFilter)) {
2023         opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2024         continue;
2025       }
2026 
2027       final String valueSize = "--valueSize=";
2028       if (cmd.startsWith(valueSize)) {
2029         opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2030         continue;
2031       }
2032 
2033       final String valueRandom = "--valueRandom";
2034       if (cmd.startsWith(valueRandom)) {
2035         opts.valueRandom = true;
2036         if (opts.valueZipf) {
2037           throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2038         }
2039         continue;
2040       }
2041 
2042       final String valueZipf = "--valueZipf";
2043       if (cmd.startsWith(valueZipf)) {
2044         opts.valueZipf = true;
2045         if (opts.valueRandom) {
2046           throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2047         }
2048         continue;
2049       }
2050 
2051       final String period = "--period=";
2052       if (cmd.startsWith(period)) {
2053         opts.period = Integer.parseInt(cmd.substring(period.length()));
2054         continue;
2055       }
2056 
2057       final String addColumns = "--addColumns=";
2058       if (cmd.startsWith(addColumns)) {
2059         opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2060         continue;
2061       }
2062 
2063       final String columns = "--columns=";
2064       if (cmd.startsWith(columns)) {
2065         opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2066         continue;
2067       }
2068 
2069       final String caching = "--caching=";
2070       if (cmd.startsWith(caching)) {
2071         opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2072         continue;
2073       }
2074 
2075       if (isCommandClass(cmd)) {
2076         opts.cmdName = cmd;
2077         opts.numClientThreads = Integer.parseInt(args.remove());
2078         int rowsPerGB = getRowsPerGB(opts);
2079         if (opts.size != DEFAULT_OPTS.size &&
2080             opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2081           throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
2082         }
2083         if (opts.size != DEFAULT_OPTS.size) {
2084           // total size in GB specified
2085           opts.totalRows = (int) opts.size * rowsPerGB;
2086           opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2087         } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2088           // number of rows specified
2089           opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2090           opts.size = opts.totalRows / rowsPerGB;
2091         }
2092         break;
2093       } else {
2094         printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2095       }
2096 
2097       // Not matching any option or command.
2098       System.err.println("Error: Wrong option or command: " + cmd);
2099       args.add(cmd);
2100       break;
2101     }
2102     return opts;
2103   }
2104 
getRowsPerGB(final TestOptions opts)2105   static int getRowsPerGB(final TestOptions opts) {
2106     return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
2107   }
2108 
2109   @Override
run(String[] args)2110   public int run(String[] args) throws Exception {
2111     // Process command-line args. TODO: Better cmd-line processing
2112     // (but hopefully something not as painful as cli options).
2113     int errCode = -1;
2114     if (args.length < 1) {
2115       printUsage();
2116       return errCode;
2117     }
2118 
2119     try {
2120       LinkedList<String> argv = new LinkedList<String>();
2121       argv.addAll(Arrays.asList(args));
2122       TestOptions opts = parseOpts(argv);
2123 
2124       // args remaining, print help and exit
2125       if (!argv.isEmpty()) {
2126         errCode = 0;
2127         printUsage();
2128         return errCode;
2129       }
2130 
2131       // must run at least 1 client
2132       if (opts.numClientThreads <= 0) {
2133         throw new IllegalArgumentException("Number of clients must be > 0");
2134       }
2135 
2136       Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
2137       if (cmdClass != null) {
2138         runTest(cmdClass, opts);
2139         errCode = 0;
2140       }
2141 
2142     } catch (Exception e) {
2143       e.printStackTrace();
2144     }
2145 
2146     return errCode;
2147   }
2148 
isCommandClass(String cmd)2149   private static boolean isCommandClass(String cmd) {
2150     return COMMANDS.containsKey(cmd);
2151   }
2152 
determineCommandClass(String cmd)2153   private static Class<? extends Test> determineCommandClass(String cmd) {
2154     CmdDescriptor descriptor = COMMANDS.get(cmd);
2155     return descriptor != null ? descriptor.getCmdClass() : null;
2156   }
2157 
main(final String[] args)2158   public static void main(final String[] args) throws Exception {
2159     int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2160     System.exit(res);
2161   }
2162 }
2163