1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.rest;
20 
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.fs.FSDataInputStream;
26 import org.apache.hadoop.fs.FileStatus;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.Tag;
36 import org.apache.hadoop.hbase.client.BufferedMutator;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.ConnectionFactory;
39 import org.apache.hadoop.hbase.client.Durability;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.client.ResultScanner;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.filter.BinaryComparator;
47 import org.apache.hadoop.hbase.filter.CompareFilter;
48 import org.apache.hadoop.hbase.filter.Filter;
49 import org.apache.hadoop.hbase.filter.PageFilter;
50 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
51 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
52 import org.apache.hadoop.hbase.io.compress.Compression;
53 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
54 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
55 import org.apache.hadoop.hbase.rest.client.Client;
56 import org.apache.hadoop.hbase.rest.client.Cluster;
57 import org.apache.hadoop.hbase.rest.client.RemoteAdmin;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.Hash;
60 import org.apache.hadoop.hbase.util.MurmurHash;
61 import org.apache.hadoop.hbase.util.Pair;
62 import org.apache.hadoop.io.LongWritable;
63 import org.apache.hadoop.io.NullWritable;
64 import org.apache.hadoop.io.Text;
65 import org.apache.hadoop.io.Writable;
66 import org.apache.hadoop.mapreduce.InputSplit;
67 import org.apache.hadoop.mapreduce.Job;
68 import org.apache.hadoop.mapreduce.JobContext;
69 import org.apache.hadoop.mapreduce.Mapper;
70 import org.apache.hadoop.mapreduce.RecordReader;
71 import org.apache.hadoop.mapreduce.TaskAttemptContext;
72 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
73 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
74 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
75 import org.apache.hadoop.util.LineReader;
76 import org.apache.hadoop.util.Tool;
77 import org.apache.hadoop.util.ToolRunner;
78 
79 import java.io.DataInput;
80 import java.io.DataOutput;
81 import java.io.IOException;
82 import java.io.PrintStream;
83 import java.lang.reflect.Constructor;
84 import java.text.SimpleDateFormat;
85 import java.util.ArrayList;
86 import java.util.Arrays;
87 import java.util.Date;
88 import java.util.List;
89 import java.util.Map;
90 import java.util.Random;
91 import java.util.TreeMap;
92 import java.util.regex.Matcher;
93 import java.util.regex.Pattern;
94 
95 /**
96  * Script used evaluating Stargate performance and scalability.  Runs a SG
97  * client that steps through one of a set of hardcoded tests or 'experiments'
98  * (e.g. a random reads test, a random writes test, etc.). Pass on the
99  * command-line which test to run and how many clients are participating in
100  * this experiment. Run <code>java PerformanceEvaluation --help</code> to
101  * obtain usage.
102  *
103  * <p>This class sets up and runs the evaluation programs described in
104  * Section 7, <i>Performance Evaluation</i>, of the <a
105  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
106  * paper, pages 8-10.
107  *
108  * <p>If number of clients > 1, we start up a MapReduce job. Each map task
109  * runs an individual client. Each client does about 1GB of data.
110  */
111 public class PerformanceEvaluation extends Configured implements Tool {
112   protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
113 
114   private static final int DEFAULT_ROW_PREFIX_LENGTH = 16;
115   private static final int ROW_LENGTH = 1000;
116   private static final int TAG_LENGTH = 256;
117   private static final int ONE_GB = 1024 * 1024 * 1000;
118   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
119 
120   public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
121   public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
122   public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
123   private TableName tableName = TABLE_NAME;
124 
125   protected HTableDescriptor TABLE_DESCRIPTOR;
126   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
127   protected static Cluster cluster = new Cluster();
128 
129   volatile Configuration conf;
130   private boolean nomapred = false;
131   private int N = 1;
132   private int R = ROWS_PER_GB;
133   private Compression.Algorithm compression = Compression.Algorithm.NONE;
134   private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
135   private boolean flushCommits = true;
136   private boolean writeToWAL = true;
137   private boolean inMemoryCF = false;
138   private int presplitRegions = 0;
139   private boolean useTags = false;
140   private int noOfTags = 1;
141   private Connection connection;
142 
143   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
144   /**
145    * Regex to parse lines in input file passed to mapreduce task.
146    */
147   public static final Pattern LINE_PATTERN =
148       Pattern.compile("tableName=(\\w+),\\s+" +
149           "startRow=(\\d+),\\s+" +
150           "perClientRunRows=(\\d+),\\s+" +
151           "totalRows=(\\d+),\\s+" +
152           "clients=(\\d+),\\s+" +
153           "flushCommits=(\\w+),\\s+" +
154           "writeToWAL=(\\w+),\\s+" +
155           "useTags=(\\w+),\\s+" +
156           "noOfTags=(\\d+)");
157 
158   /**
159    * Enum for map metrics.  Keep it out here rather than inside in the Map
160    * inner-class so we can find associated properties.
161    */
162   protected static enum Counter {
163     /** elapsed time */
164     ELAPSED_TIME,
165     /** number of rows */
166     ROWS}
167 
168   /**
169    * Constructor
170    * @param c Configuration object
171    */
PerformanceEvaluation(final Configuration c)172   public PerformanceEvaluation(final Configuration c) {
173     this.conf = c;
174 
175     addCommandDescriptor(RandomReadTest.class, "randomRead",
176         "Run random read test");
177     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
178         "Run random seek and scan 100 test");
179     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
180         "Run random seek scan with both start and stop row (max 10 rows)");
181     addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
182         "Run random seek scan with both start and stop row (max 100 rows)");
183     addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
184         "Run random seek scan with both start and stop row (max 1000 rows)");
185     addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
186         "Run random seek scan with both start and stop row (max 10000 rows)");
187     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
188         "Run random write test");
189     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
190         "Run sequential read test");
191     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
192         "Run sequential write test");
193     addCommandDescriptor(ScanTest.class, "scan",
194         "Run scan test (read every row)");
195     addCommandDescriptor(FilteredScanTest.class, "filterScan",
196         "Run scan test using a filter to find a specific row based " +
197         "on it's value (make sure to use --rows=20)");
198   }
199 
addCommandDescriptor(Class<? extends Test> cmdClass, String name, String description)200   protected void addCommandDescriptor(Class<? extends Test> cmdClass,
201       String name, String description) {
202     CmdDescriptor cmdDescriptor =
203       new CmdDescriptor(cmdClass, name, description);
204     commands.put(name, cmdDescriptor);
205   }
206 
207   /**
208    * Implementations can have their status set.
209    */
210   interface Status {
211     /**
212      * Sets status
213      * @param msg status message
214      * @throws IOException
215      */
setStatus(final String msg)216     void setStatus(final String msg) throws IOException;
217   }
218 
219   /**
220    *  This class works as the InputSplit of Performance Evaluation
221    *  MapReduce InputFormat, and the Record Value of RecordReader.
222    *  Each map task will only read one record from a PeInputSplit,
223    *  the record value is the PeInputSplit itself.
224    */
225   public static class PeInputSplit extends InputSplit implements Writable {
226     private TableName tableName = TABLE_NAME;
227     private int startRow = 0;
228     private int rows = 0;
229     private int totalRows = 0;
230     private int clients = 0;
231     private boolean flushCommits = false;
232     private boolean writeToWAL = true;
233     private boolean useTags = false;
234     private int noOfTags = 0;
235 
PeInputSplit()236     public PeInputSplit() {
237     }
238 
PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags)239     public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients,
240         boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) {
241       this.tableName = tableName;
242       this.startRow = startRow;
243       this.rows = rows;
244       this.totalRows = totalRows;
245       this.clients = clients;
246       this.flushCommits = flushCommits;
247       this.writeToWAL = writeToWAL;
248       this.useTags = useTags;
249       this.noOfTags = noOfTags;
250     }
251 
252     @Override
readFields(DataInput in)253     public void readFields(DataInput in) throws IOException {
254       int tableNameLen = in.readInt();
255       byte[] name = new byte[tableNameLen];
256       in.readFully(name);
257       this.tableName = TableName.valueOf(name);
258       this.startRow = in.readInt();
259       this.rows = in.readInt();
260       this.totalRows = in.readInt();
261       this.clients = in.readInt();
262       this.flushCommits = in.readBoolean();
263       this.writeToWAL = in.readBoolean();
264       this.useTags = in.readBoolean();
265       this.noOfTags = in.readInt();
266     }
267 
268     @Override
write(DataOutput out)269     public void write(DataOutput out) throws IOException {
270       byte[] name = this.tableName.toBytes();
271       out.writeInt(name.length);
272       out.write(name);
273       out.writeInt(startRow);
274       out.writeInt(rows);
275       out.writeInt(totalRows);
276       out.writeInt(clients);
277       out.writeBoolean(flushCommits);
278       out.writeBoolean(writeToWAL);
279       out.writeBoolean(useTags);
280       out.writeInt(noOfTags);
281     }
282 
283     @Override
getLength()284     public long getLength() throws IOException, InterruptedException {
285       return 0;
286     }
287 
288     @Override
getLocations()289     public String[] getLocations() throws IOException, InterruptedException {
290       return new String[0];
291     }
292 
getStartRow()293     public int getStartRow() {
294       return startRow;
295     }
296 
getTableName()297     public TableName getTableName() {
298       return tableName;
299     }
300 
getRows()301     public int getRows() {
302       return rows;
303     }
304 
getTotalRows()305     public int getTotalRows() {
306       return totalRows;
307     }
308 
getClients()309     public int getClients() {
310       return clients;
311     }
312 
isFlushCommits()313     public boolean isFlushCommits() {
314       return flushCommits;
315     }
316 
isWriteToWAL()317     public boolean isWriteToWAL() {
318       return writeToWAL;
319     }
320 
isUseTags()321     public boolean isUseTags() {
322       return useTags;
323     }
324 
getNoOfTags()325     public int getNoOfTags() {
326       return noOfTags;
327     }
328   }
329 
330   /**
331    *  InputFormat of Performance Evaluation MapReduce job.
332    *  It extends from FileInputFormat, want to use it's methods such as setInputPaths().
333    */
334   public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
335 
336     @Override
getSplits(JobContext job)337     public List<InputSplit> getSplits(JobContext job) throws IOException {
338       // generate splits
339       List<InputSplit> splitList = new ArrayList<InputSplit>();
340 
341       for (FileStatus file: listStatus(job)) {
342         if (file.isDirectory()) {
343           continue;
344         }
345         Path path = file.getPath();
346         FileSystem fs = path.getFileSystem(job.getConfiguration());
347         FSDataInputStream fileIn = fs.open(path);
348         LineReader in = new LineReader(fileIn, job.getConfiguration());
349         int lineLen = 0;
350         while(true) {
351           Text lineText = new Text();
352           lineLen = in.readLine(lineText);
353           if(lineLen <= 0) {
354           break;
355           }
356           Matcher m = LINE_PATTERN.matcher(lineText.toString());
357           if((m != null) && m.matches()) {
358             TableName tableName = TableName.valueOf(m.group(1));
359             int startRow = Integer.parseInt(m.group(2));
360             int rows = Integer.parseInt(m.group(3));
361             int totalRows = Integer.parseInt(m.group(4));
362             int clients = Integer.parseInt(m.group(5));
363             boolean flushCommits = Boolean.parseBoolean(m.group(6));
364             boolean writeToWAL = Boolean.parseBoolean(m.group(7));
365             boolean useTags = Boolean.parseBoolean(m.group(8));
366             int noOfTags = Integer.parseInt(m.group(9));
367 
368             LOG.debug("tableName=" + tableName +
369                       " split["+ splitList.size() + "] " +
370                       " startRow=" + startRow +
371                       " rows=" + rows +
372                       " totalRows=" + totalRows +
373                       " clients=" + clients +
374                       " flushCommits=" + flushCommits +
375                       " writeToWAL=" + writeToWAL +
376                       " useTags=" + useTags +
377                       " noOfTags=" + noOfTags);
378 
379             PeInputSplit newSplit =
380               new PeInputSplit(tableName, startRow, rows, totalRows, clients,
381                   flushCommits, writeToWAL, useTags, noOfTags);
382             splitList.add(newSplit);
383           }
384         }
385         in.close();
386       }
387 
388       LOG.info("Total # of splits: " + splitList.size());
389       return splitList;
390     }
391 
392     @Override
createRecordReader(InputSplit split, TaskAttemptContext context)393     public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
394                             TaskAttemptContext context) {
395       return new PeRecordReader();
396     }
397 
398     public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
399       private boolean readOver = false;
400       private PeInputSplit split = null;
401       private NullWritable key = null;
402       private PeInputSplit value = null;
403 
404       @Override
initialize(InputSplit split, TaskAttemptContext context)405       public void initialize(InputSplit split, TaskAttemptContext context)
406                   throws IOException, InterruptedException {
407         this.readOver = false;
408         this.split = (PeInputSplit)split;
409       }
410 
411       @Override
nextKeyValue()412       public boolean nextKeyValue() throws IOException, InterruptedException {
413         if(readOver) {
414           return false;
415         }
416 
417         key = NullWritable.get();
418         value = (PeInputSplit)split;
419 
420         readOver = true;
421         return true;
422       }
423 
424       @Override
getCurrentKey()425       public NullWritable getCurrentKey() throws IOException, InterruptedException {
426         return key;
427       }
428 
429       @Override
getCurrentValue()430       public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
431         return value;
432       }
433 
434       @Override
getProgress()435       public float getProgress() throws IOException, InterruptedException {
436         if(readOver) {
437           return 1.0f;
438         } else {
439           return 0.0f;
440         }
441       }
442 
443       @Override
close()444       public void close() throws IOException {
445         // do nothing
446       }
447     }
448   }
449 
450   /**
451    * MapReduce job that runs a performance evaluation client in each map task.
452    */
453   public static class EvaluationMapTask
454       extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
455 
456     /** configuration parameter name that contains the command */
457     public final static String CMD_KEY = "EvaluationMapTask.command";
458     /** configuration parameter name that contains the PE impl */
459     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
460 
461     private Class<? extends Test> cmd;
462     private PerformanceEvaluation pe;
463 
464     @Override
setup(Context context)465     protected void setup(Context context) throws IOException, InterruptedException {
466       this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
467 
468       // this is required so that extensions of PE are instantiated within the
469       // map reduce task...
470       Class<? extends PerformanceEvaluation> peClass =
471           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
472       try {
473         this.pe = peClass.getConstructor(Configuration.class)
474             .newInstance(context.getConfiguration());
475       } catch (Exception e) {
476         throw new IllegalStateException("Could not instantiate PE instance", e);
477       }
478     }
479 
forName(String className, Class<Type> type)480     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
481       Class<? extends Type> clazz = null;
482       try {
483         clazz = Class.forName(className).asSubclass(type);
484       } catch (ClassNotFoundException e) {
485         throw new IllegalStateException("Could not find class for name: " + className, e);
486       }
487       return clazz;
488     }
489 
map(NullWritable key, PeInputSplit value, final Context context)490     protected void map(NullWritable key, PeInputSplit value, final Context context)
491            throws IOException, InterruptedException {
492 
493       Status status = new Status() {
494         public void setStatus(String msg) {
495            context.setStatus(msg);
496         }
497       };
498 
499       // Evaluation task
500       pe.tableName = value.getTableName();
501       long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
502         value.getRows(), value.getTotalRows(),
503         value.isFlushCommits(), value.isWriteToWAL(),
504         value.isUseTags(), value.getNoOfTags(),
505         ConnectionFactory.createConnection(context.getConfiguration()), status);
506       // Collect how much time the thing took. Report as map output and
507       // to the ELAPSED_TIME counter.
508       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
509       context.getCounter(Counter.ROWS).increment(value.rows);
510       context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
511       context.progress();
512     }
513   }
514 
515   /*
516    * If table does not already exist, create.
517    * @param c Client to use checking.
518    * @return True if we created the table.
519    * @throws IOException
520    */
checkTable(RemoteAdmin admin)521   private boolean checkTable(RemoteAdmin admin) throws IOException {
522     HTableDescriptor tableDescriptor = getTableDescriptor();
523     if (this.presplitRegions > 0) {
524       // presplit requested
525       if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) {
526         admin.deleteTable(tableDescriptor.getTableName().getName());
527       }
528 
529       byte[][] splits = getSplits();
530       for (int i=0; i < splits.length; i++) {
531         LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
532       }
533       admin.createTable(tableDescriptor);
534       LOG.info ("Table created with " + this.presplitRegions + " splits");
535     } else {
536       boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName());
537       if (!tableExists) {
538         admin.createTable(tableDescriptor);
539         LOG.info("Table " + tableDescriptor + " created");
540       }
541     }
542     boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName());
543     return tableExists;
544   }
545 
getTableDescriptor()546   protected HTableDescriptor getTableDescriptor() {
547     if (TABLE_DESCRIPTOR == null) {
548       TABLE_DESCRIPTOR = new HTableDescriptor(tableName);
549       HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
550       family.setDataBlockEncoding(blockEncoding);
551       family.setCompressionType(compression);
552       if (inMemoryCF) {
553         family.setInMemory(true);
554       }
555       TABLE_DESCRIPTOR.addFamily(family);
556     }
557     return TABLE_DESCRIPTOR;
558   }
559 
560   /**
561    * Generates splits based on total number of rows and specified split regions
562    *
563    * @return splits : array of byte []
564    */
getSplits()565   protected  byte[][] getSplits() {
566     if (this.presplitRegions == 0)
567       return new byte [0][];
568 
569     int numSplitPoints = presplitRegions - 1;
570     byte[][] splits = new byte[numSplitPoints][];
571     int jump = this.R  / this.presplitRegions;
572     for (int i=0; i < numSplitPoints; i++) {
573       int rowkey = jump * (1 + i);
574       splits[i] = format(rowkey);
575     }
576     return splits;
577   }
578 
579   /*
580    * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
581    * one map per client.  Then run a single reduce to sum the elapsed times.
582    * @param cmd Command to run.
583    * @throws IOException
584    */
runNIsMoreThanOne(final Class<? extends Test> cmd)585   private void runNIsMoreThanOne(final Class<? extends Test> cmd)
586   throws IOException, InterruptedException, ClassNotFoundException {
587     RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf());
588     checkTable(remoteAdmin);
589     if (nomapred) {
590       doMultipleClients(cmd);
591     } else {
592       doMapReduce(cmd);
593     }
594   }
595 
596   /*
597    * Run all clients in this vm each to its own thread.
598    * @param cmd Command to run.
599    * @throws IOException
600    */
doMultipleClients(final Class<? extends Test> cmd)601   private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
602     final List<Thread> threads = new ArrayList<Thread>(this.N);
603     final long[] timings = new long[this.N];
604     final int perClientRows = R/N;
605     final TableName tableName = this.tableName;
606     final DataBlockEncoding encoding = this.blockEncoding;
607     final boolean flushCommits = this.flushCommits;
608     final Compression.Algorithm compression = this.compression;
609     final boolean writeToWal = this.writeToWAL;
610     final int preSplitRegions = this.presplitRegions;
611     final boolean useTags = this.useTags;
612     final int numTags = this.noOfTags;
613     final Connection connection = ConnectionFactory.createConnection(getConf());
614     for (int i = 0; i < this.N; i++) {
615       final int index = i;
616       Thread t = new Thread ("TestClient-" + i) {
617         @Override
618         public void run() {
619           super.run();
620           PerformanceEvaluation pe = new PerformanceEvaluation(getConf());
621           pe.tableName = tableName;
622           pe.blockEncoding = encoding;
623           pe.flushCommits = flushCommits;
624           pe.compression = compression;
625           pe.writeToWAL = writeToWal;
626           pe.presplitRegions = preSplitRegions;
627           pe.N = N;
628           pe.connection = connection;
629           pe.useTags = useTags;
630           pe.noOfTags = numTags;
631           try {
632             long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
633                 perClientRows, R,
634                  flushCommits, writeToWAL, useTags, noOfTags, connection, new Status() {
635                    public void setStatus(final String msg) throws IOException {
636                      LOG.info("client-" + getName() + " " + msg);
637                    }
638                  });
639             timings[index] = elapsedTime;
640             LOG.info("Finished " + getName() + " in " + elapsedTime +
641               "ms writing " + perClientRows + " rows");
642           } catch (IOException e) {
643             throw new RuntimeException(e);
644           }
645         }
646       };
647       threads.add(t);
648     }
649     for (Thread t: threads) {
650       t.start();
651     }
652     for (Thread t: threads) {
653       while(t.isAlive()) {
654         try {
655           t.join();
656         } catch (InterruptedException e) {
657           LOG.debug("Interrupted, continuing" + e.toString());
658         }
659       }
660     }
661     final String test = cmd.getSimpleName();
662     LOG.info("[" + test + "] Summary of timings (ms): "
663              + Arrays.toString(timings));
664     Arrays.sort(timings);
665     long total = 0;
666     for (int i = 0; i < this.N; i++) {
667       total += timings[i];
668     }
669     LOG.info("[" + test + "]"
670              + "\tMin: " + timings[0] + "ms"
671              + "\tMax: " + timings[this.N - 1] + "ms"
672              + "\tAvg: " + (total / this.N) + "ms");
673   }
674 
675   /*
676    * Run a mapreduce job.  Run as many maps as asked-for clients.
677    * Before we start up the job, write out an input file with instruction
678    * per client regards which row they are to start on.
679    * @param cmd Command to run.
680    * @throws IOException
681    */
doMapReduce(final Class<? extends Test> cmd)682   private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
683         InterruptedException, ClassNotFoundException {
684     Configuration conf = getConf();
685     Path inputDir = writeInputFile(conf);
686     conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
687     conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
688     Job job = Job.getInstance(conf);
689     job.setJarByClass(PerformanceEvaluation.class);
690     job.setJobName("HBase Performance Evaluation");
691 
692     job.setInputFormatClass(PeInputFormat.class);
693     PeInputFormat.setInputPaths(job, inputDir);
694 
695     job.setOutputKeyClass(LongWritable.class);
696     job.setOutputValueClass(LongWritable.class);
697 
698     job.setMapperClass(EvaluationMapTask.class);
699     job.setReducerClass(LongSumReducer.class);
700     job.setNumReduceTasks(1);
701 
702     job.setOutputFormatClass(TextOutputFormat.class);
703     TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
704     TableMapReduceUtil.addDependencyJars(job);
705     TableMapReduceUtil.initCredentials(job);
706     job.waitForCompletion(true);
707   }
708 
709   /*
710    * Write input file of offsets-per-client for the mapreduce job.
711    * @param c Configuration
712    * @return Directory that contains file written.
713    * @throws IOException
714    */
writeInputFile(final Configuration c)715   private Path writeInputFile(final Configuration c) throws IOException {
716     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
717     Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
718     Path inputDir = new Path(jobdir, "inputs");
719 
720     FileSystem fs = FileSystem.get(c);
721     fs.mkdirs(inputDir);
722     Path inputFile = new Path(inputDir, "input.txt");
723     PrintStream out = new PrintStream(fs.create(inputFile));
724     // Make input random.
725     Map<Integer, String> m = new TreeMap<Integer, String>();
726     Hash h = MurmurHash.getInstance();
727     int perClientRows = (this.R / this.N);
728     try {
729       for (int i = 0; i < 10; i++) {
730         for (int j = 0; j < N; j++) {
731           String s = "tableName=" + this.tableName +
732           ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
733           ", perClientRunRows=" + (perClientRows / 10) +
734           ", totalRows=" + this.R +
735           ", clients=" + this.N +
736           ", flushCommits=" + this.flushCommits +
737           ", writeToWAL=" + this.writeToWAL +
738           ", useTags=" + this.useTags +
739           ", noOfTags=" + this.noOfTags;
740           int hash = h.hash(Bytes.toBytes(s));
741           m.put(hash, s);
742         }
743       }
744       for (Map.Entry<Integer, String> e: m.entrySet()) {
745         out.println(e.getValue());
746       }
747     } finally {
748       out.close();
749     }
750     return inputDir;
751   }
752 
753   /**
754    * Describes a command.
755    */
756   static class CmdDescriptor {
757     private Class<? extends Test> cmdClass;
758     private String name;
759     private String description;
760 
CmdDescriptor(Class<? extends Test> cmdClass, String name, String description)761     CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
762       this.cmdClass = cmdClass;
763       this.name = name;
764       this.description = description;
765     }
766 
getCmdClass()767     public Class<? extends Test> getCmdClass() {
768       return cmdClass;
769     }
770 
getName()771     public String getName() {
772       return name;
773     }
774 
getDescription()775     public String getDescription() {
776       return description;
777     }
778   }
779 
780   /**
781    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test
782    * tests}.  This makes the reflection logic a little easier to understand...
783    */
784   static class TestOptions {
785     private int startRow;
786     private int perClientRunRows;
787     private int totalRows;
788     private int numClientThreads;
789     private TableName tableName;
790     private boolean flushCommits;
791     private boolean writeToWAL = true;
792     private boolean useTags = false;
793     private int noOfTags = 0;
794     private Connection connection;
795 
TestOptions()796     TestOptions() {
797     }
798 
TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, Connection connection)799     TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads,
800         TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags,
801         int noOfTags, Connection connection) {
802       this.startRow = startRow;
803       this.perClientRunRows = perClientRunRows;
804       this.totalRows = totalRows;
805       this.numClientThreads = numClientThreads;
806       this.tableName = tableName;
807       this.flushCommits = flushCommits;
808       this.writeToWAL = writeToWAL;
809       this.useTags = useTags;
810       this.noOfTags = noOfTags;
811       this.connection = connection;
812     }
813 
getStartRow()814     public int getStartRow() {
815       return startRow;
816     }
817 
getPerClientRunRows()818     public int getPerClientRunRows() {
819       return perClientRunRows;
820     }
821 
getTotalRows()822     public int getTotalRows() {
823       return totalRows;
824     }
825 
getNumClientThreads()826     public int getNumClientThreads() {
827       return numClientThreads;
828     }
829 
getTableName()830     public TableName getTableName() {
831       return tableName;
832     }
833 
isFlushCommits()834     public boolean isFlushCommits() {
835       return flushCommits;
836     }
837 
isWriteToWAL()838     public boolean isWriteToWAL() {
839       return writeToWAL;
840     }
841 
getConnection()842     public Connection getConnection() {
843       return connection;
844     }
845 
isUseTags()846     public boolean isUseTags() {
847       return this.useTags;
848     }
849 
getNumTags()850     public int getNumTags() {
851       return this.noOfTags;
852     }
853   }
854 
855   /*
856    * A test.
857    * Subclass to particularize what happens per row.
858    */
859   static abstract class Test {
860     // Below is make it so when Tests are all running in the one
861     // jvm, that they each have a differently seeded Random.
862     private static final Random randomSeed =
863       new Random(System.currentTimeMillis());
nextRandomSeed()864     private static long nextRandomSeed() {
865       return randomSeed.nextLong();
866     }
867     protected final Random rand = new Random(nextRandomSeed());
868 
869     protected final int startRow;
870     protected final int perClientRunRows;
871     protected final int totalRows;
872     private final Status status;
873     protected TableName tableName;
874     protected volatile Configuration conf;
875     protected boolean writeToWAL;
876     protected boolean useTags;
877     protected int noOfTags;
878     protected Connection connection;
879 
880     /**
881      * Note that all subclasses of this class must provide a public contructor
882      * that has the exact same list of arguments.
883      */
Test(final Configuration conf, final TestOptions options, final Status status)884     Test(final Configuration conf, final TestOptions options, final Status status) {
885       super();
886       this.startRow = options.getStartRow();
887       this.perClientRunRows = options.getPerClientRunRows();
888       this.totalRows = options.getTotalRows();
889       this.status = status;
890       this.tableName = options.getTableName();
891       this.conf = conf;
892       this.writeToWAL = options.isWriteToWAL();
893       this.useTags = options.isUseTags();
894       this.noOfTags = options.getNumTags();
895       this.connection = options.getConnection();
896     }
897 
generateStatus(final int sr, final int i, final int lr)898     protected String generateStatus(final int sr, final int i, final int lr) {
899       return sr + "/" + i + "/" + lr;
900     }
901 
getReportingPeriod()902     protected int getReportingPeriod() {
903       int period = this.perClientRunRows / 10;
904       return period == 0? this.perClientRunRows: period;
905     }
906 
testTakedown()907     abstract void testTakedown()  throws IOException;
908     /*
909      * Run test
910      * @return Elapsed time.
911      * @throws IOException
912      */
test()913     long test() throws IOException {
914       testSetup();
915       LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
916       final long startTime = System.nanoTime();
917       try {
918         testTimed();
919       } finally {
920         testTakedown();
921       }
922       return (System.nanoTime() - startTime) / 1000000;
923     }
924 
testSetup()925     abstract void testSetup() throws IOException;
926 
927     /**
928      * Provides an extension point for tests that don't want a per row invocation.
929      */
testTimed()930     void testTimed() throws IOException {
931       int lastRow = this.startRow + this.perClientRunRows;
932       // Report on completion of 1/10th of total.
933       for (int i = this.startRow; i < lastRow; i++) {
934         testRow(i);
935         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
936           status.setStatus(generateStatus(this.startRow, i, lastRow));
937         }
938       }
939     }
940 
941     /*
942     * Test for individual row.
943     * @param i Row index.
944     */
testRow(final int i)945     abstract void testRow(final int i) throws IOException;
946   }
947 
948   static abstract class TableTest extends Test {
949     protected Table table;
950 
TableTest(Configuration conf, TestOptions options, Status status)951     public TableTest(Configuration conf, TestOptions options, Status status) {
952       super(conf, options, status);
953     }
954 
testSetup()955     void testSetup() throws IOException {
956       this.table = connection.getTable(tableName);
957     }
958 
959     @Override
testTakedown()960     void testTakedown() throws IOException {
961       table.close();
962     }
963   }
964 
965   static abstract class BufferedMutatorTest extends Test {
966     protected BufferedMutator mutator;
967     protected boolean flushCommits;
968 
BufferedMutatorTest(Configuration conf, TestOptions options, Status status)969     public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) {
970       super(conf, options, status);
971       this.flushCommits = options.isFlushCommits();
972     }
973 
testSetup()974     void testSetup() throws IOException {
975       this.mutator = connection.getBufferedMutator(tableName);
976     }
977 
testTakedown()978     void testTakedown()  throws IOException {
979       if (flushCommits) {
980         this.mutator.flush();
981       }
982       mutator.close();
983     }
984   }
985 
986   static class RandomSeekScanTest extends TableTest {
RandomSeekScanTest(Configuration conf, TestOptions options, Status status)987     RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
988       super(conf, options, status);
989     }
990 
991     @Override
testRow(final int i)992     void testRow(final int i) throws IOException {
993       Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
994       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
995       scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
996       ResultScanner s = this.table.getScanner(scan);
997       s.close();
998     }
999 
1000     @Override
getReportingPeriod()1001     protected int getReportingPeriod() {
1002       int period = this.perClientRunRows / 100;
1003       return period == 0? this.perClientRunRows: period;
1004     }
1005 
1006   }
1007 
1008   @SuppressWarnings("unused")
1009   static abstract class RandomScanWithRangeTest extends TableTest {
RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status)1010     RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
1011       super(conf, options, status);
1012     }
1013 
1014     @Override
testRow(final int i)1015     void testRow(final int i) throws IOException {
1016       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1017       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
1018       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1019       ResultScanner s = this.table.getScanner(scan);
1020       int count = 0;
1021       for (Result rr = null; (rr = s.next()) != null;) {
1022         count++;
1023       }
1024 
1025       if (i % 100 == 0) {
1026         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1027             Bytes.toString(startAndStopRow.getFirst()),
1028             Bytes.toString(startAndStopRow.getSecond()), count));
1029       }
1030 
1031       s.close();
1032     }
1033 
getStartAndStopRow()1034     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1035 
generateStartAndStopRows(int maxRange)1036     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1037       int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
1038       int stop = start + maxRange;
1039       return new Pair<byte[],byte[]>(format(start), format(stop));
1040     }
1041 
1042     @Override
getReportingPeriod()1043     protected int getReportingPeriod() {
1044       int period = this.perClientRunRows / 100;
1045       return period == 0? this.perClientRunRows: period;
1046     }
1047   }
1048 
1049   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status)1050     RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
1051       super(conf, options, status);
1052     }
1053 
1054     @Override
getStartAndStopRow()1055     protected Pair<byte[], byte[]> getStartAndStopRow() {
1056       return generateStartAndStopRows(10);
1057     }
1058   }
1059 
1060   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status)1061     RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
1062       super(conf, options, status);
1063     }
1064 
1065     @Override
getStartAndStopRow()1066     protected Pair<byte[], byte[]> getStartAndStopRow() {
1067       return generateStartAndStopRows(100);
1068     }
1069   }
1070 
1071   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status)1072     RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
1073       super(conf, options, status);
1074     }
1075 
1076     @Override
getStartAndStopRow()1077     protected Pair<byte[], byte[]> getStartAndStopRow() {
1078       return generateStartAndStopRows(1000);
1079     }
1080   }
1081 
1082   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status)1083     RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
1084       super(conf, options, status);
1085     }
1086 
1087     @Override
getStartAndStopRow()1088     protected Pair<byte[], byte[]> getStartAndStopRow() {
1089       return generateStartAndStopRows(10000);
1090     }
1091   }
1092 
1093   static class RandomReadTest extends TableTest {
RandomReadTest(Configuration conf, TestOptions options, Status status)1094     RandomReadTest(Configuration conf, TestOptions options, Status status) {
1095       super(conf, options, status);
1096     }
1097 
1098     @Override
testRow(final int i)1099     void testRow(final int i) throws IOException {
1100       Get get = new Get(getRandomRow(this.rand, this.totalRows));
1101       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1102       this.table.get(get);
1103     }
1104 
1105     @Override
getReportingPeriod()1106     protected int getReportingPeriod() {
1107       int period = this.perClientRunRows / 100;
1108       return period == 0? this.perClientRunRows: period;
1109     }
1110 
1111   }
1112 
1113   static class RandomWriteTest extends BufferedMutatorTest {
RandomWriteTest(Configuration conf, TestOptions options, Status status)1114     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
1115       super(conf, options, status);
1116     }
1117 
1118     @Override
testRow(final int i)1119     void testRow(final int i) throws IOException {
1120       byte[] row = getRandomRow(this.rand, this.totalRows);
1121       Put put = new Put(row);
1122       byte[] value = generateData(this.rand, ROW_LENGTH);
1123       if (useTags) {
1124         byte[] tag = generateData(this.rand, TAG_LENGTH);
1125         Tag[] tags = new Tag[noOfTags];
1126         for (int n = 0; n < noOfTags; n++) {
1127           Tag t = new Tag((byte) n, tag);
1128           tags[n] = t;
1129         }
1130         KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
1131             value, tags);
1132         put.add(kv);
1133       } else {
1134         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
1135       }
1136       put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1137       mutator.mutate(put);
1138     }
1139   }
1140 
1141   static class ScanTest extends TableTest {
1142     private ResultScanner testScanner;
1143 
ScanTest(Configuration conf, TestOptions options, Status status)1144     ScanTest(Configuration conf, TestOptions options, Status status) {
1145       super(conf, options, status);
1146     }
1147 
1148     @Override
testTakedown()1149     void testTakedown() throws IOException {
1150       if (this.testScanner != null) {
1151         this.testScanner.close();
1152       }
1153       super.testTakedown();
1154     }
1155 
1156 
1157     @Override
testRow(final int i)1158     void testRow(final int i) throws IOException {
1159       if (this.testScanner == null) {
1160         Scan scan = new Scan(format(this.startRow));
1161         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1162         this.testScanner = table.getScanner(scan);
1163       }
1164       testScanner.next();
1165     }
1166 
1167   }
1168 
1169   static class SequentialReadTest extends TableTest {
SequentialReadTest(Configuration conf, TestOptions options, Status status)1170     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
1171       super(conf, options, status);
1172     }
1173 
1174     @Override
testRow(final int i)1175     void testRow(final int i) throws IOException {
1176       Get get = new Get(format(i));
1177       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1178       table.get(get);
1179     }
1180 
1181   }
1182 
1183   static class SequentialWriteTest extends BufferedMutatorTest {
1184 
SequentialWriteTest(Configuration conf, TestOptions options, Status status)1185     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
1186       super(conf, options, status);
1187     }
1188 
1189     @Override
testRow(final int i)1190     void testRow(final int i) throws IOException {
1191       byte[] row = format(i);
1192       Put put = new Put(row);
1193       byte[] value = generateData(this.rand, ROW_LENGTH);
1194       if (useTags) {
1195         byte[] tag = generateData(this.rand, TAG_LENGTH);
1196         Tag[] tags = new Tag[noOfTags];
1197         for (int n = 0; n < noOfTags; n++) {
1198           Tag t = new Tag((byte) n, tag);
1199           tags[n] = t;
1200         }
1201         KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
1202             value, tags);
1203         put.add(kv);
1204       } else {
1205         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
1206       }
1207       put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1208       mutator.mutate(put);
1209     }
1210   }
1211 
1212   static class FilteredScanTest extends TableTest {
1213     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1214 
FilteredScanTest(Configuration conf, TestOptions options, Status status)1215     FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1216       super(conf, options, status);
1217     }
1218 
1219     @Override
testRow(int i)1220     void testRow(int i) throws IOException {
1221       byte[] value = generateValue(this.rand);
1222       Scan scan = constructScan(value);
1223       ResultScanner scanner = null;
1224       try {
1225         scanner = this.table.getScanner(scan);
1226         while (scanner.next() != null) {
1227         }
1228       } finally {
1229         if (scanner != null) scanner.close();
1230       }
1231     }
1232 
constructScan(byte[] valuePrefix)1233     protected Scan constructScan(byte[] valuePrefix) throws IOException {
1234       Filter filter = new SingleColumnValueFilter(
1235           FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
1236           new BinaryComparator(valuePrefix)
1237       );
1238       Scan scan = new Scan();
1239       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1240       scan.setFilter(filter);
1241       return scan;
1242     }
1243   }
1244 
1245   /*
1246    * Format passed integer.
1247    * @param number
1248    * @return Returns zero-prefixed 10-byte wide decimal version of passed
1249    * number (Does absolute in case number is negative).
1250    */
format(final int number)1251   public static byte [] format(final int number) {
1252     byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10];
1253     int d = Math.abs(number);
1254     for (int i = b.length - 1; i >= 0; i--) {
1255       b[i] = (byte)((d % 10) + '0');
1256       d /= 10;
1257     }
1258     return b;
1259   }
1260 
generateData(final Random r, int length)1261   public static byte[] generateData(final Random r, int length) {
1262     byte [] b = new byte [length];
1263     int i = 0;
1264 
1265     for(i = 0; i < (length-8); i += 8) {
1266       b[i] = (byte) (65 + r.nextInt(26));
1267       b[i+1] = b[i];
1268       b[i+2] = b[i];
1269       b[i+3] = b[i];
1270       b[i+4] = b[i];
1271       b[i+5] = b[i];
1272       b[i+6] = b[i];
1273       b[i+7] = b[i];
1274     }
1275 
1276     byte a = (byte) (65 + r.nextInt(26));
1277     for(; i < length; i++) {
1278       b[i] = a;
1279     }
1280     return b;
1281   }
1282 
generateValue(final Random r)1283   public static byte[] generateValue(final Random r) {
1284     byte [] b = new byte [ROW_LENGTH];
1285     r.nextBytes(b);
1286     return b;
1287   }
1288 
getRandomRow(final Random random, final int totalRows)1289   static byte [] getRandomRow(final Random random, final int totalRows) {
1290     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1291   }
1292 
runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, Connection connection, final Status status)1293   long runOneClient(final Class<? extends Test> cmd, final int startRow,
1294       final int perClientRunRows, final int totalRows,
1295       boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
1296       Connection connection, final Status status)
1297   throws IOException {
1298     status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
1299       perClientRunRows + " rows");
1300     long totalElapsedTime = 0;
1301 
1302     TestOptions options = new TestOptions(startRow, perClientRunRows,
1303       totalRows, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection);
1304     final Test t;
1305     try {
1306       Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
1307           Configuration.class, TestOptions.class, Status.class);
1308       t = constructor.newInstance(this.conf, options, status);
1309     } catch (NoSuchMethodException e) {
1310       throw new IllegalArgumentException("Invalid command class: " +
1311           cmd.getName() + ".  It does not provide a constructor as described by" +
1312           "the javadoc comment.  Available constructors are: " +
1313           Arrays.toString(cmd.getConstructors()));
1314     } catch (Exception e) {
1315       throw new IllegalStateException("Failed to construct command class", e);
1316     }
1317     totalElapsedTime = t.test();
1318 
1319     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1320       "ms at offset " + startRow + " for " + perClientRunRows + " rows");
1321     return totalElapsedTime;
1322   }
1323 
runNIsOne(final Class<? extends Test> cmd)1324   private void runNIsOne(final Class<? extends Test> cmd) {
1325     Status status = new Status() {
1326       public void setStatus(String msg) throws IOException {
1327         LOG.info(msg);
1328       }
1329     };
1330 
1331     RemoteAdmin admin = null;
1332     try {
1333       Client client = new Client(cluster);
1334       admin = new RemoteAdmin(client, getConf());
1335       checkTable(admin);
1336       runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL,
1337         this.useTags, this.noOfTags, this.connection, status);
1338     } catch (Exception e) {
1339       LOG.error("Failed", e);
1340     }
1341   }
1342 
runTest(final Class<? extends Test> cmd)1343   private void runTest(final Class<? extends Test> cmd) throws IOException,
1344           InterruptedException, ClassNotFoundException {
1345     if (N == 1) {
1346       // If there is only one client and one HRegionServer, we assume nothing
1347       // has been set up at all.
1348       runNIsOne(cmd);
1349     } else {
1350       // Else, run
1351       runNIsMoreThanOne(cmd);
1352     }
1353   }
1354 
printUsage()1355   protected void printUsage() {
1356     printUsage(null);
1357   }
1358 
printUsage(final String message)1359   protected void printUsage(final String message) {
1360     if (message != null && message.length() > 0) {
1361       System.err.println(message);
1362     }
1363     System.err.println("Usage: java " + this.getClass().getName() + " \\");
1364     System.err.println("  [--nomapred] [--rows=ROWS] [--table=NAME] \\");
1365     System.err.println("  [--compress=TYPE] [--blockEncoding=TYPE] " +
1366       "[-D<property=value>]* <command> <nclients>");
1367     System.err.println();
1368     System.err.println("Options:");
1369     System.err.println(" nomapred        Run multiple clients using threads " +
1370       "(rather than use mapreduce)");
1371     System.err.println(" rows            Rows each client runs. Default: One million");
1372     System.err.println(" table           Alternate table name. Default: 'TestTable'");
1373     System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1374     System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
1375       "Default: false");
1376     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
1377     System.err.println(" presplit        Create presplit table. Recommended for accurate perf " +
1378       "analysis (see guide).  Default: disabled");
1379     System.err.println(" inmemory        Tries to keep the HFiles of the CF inmemory as far as " +
1380       "possible.  Not guaranteed that reads are always served from inmemory.  Default: false");
1381     System.err.println(" usetags         Writes tags along with KVs.  Use with HFile V3. " +
1382       "Default : false");
1383     System.err.println(" numoftags        Specify the no of tags that would be needed. " +
1384       "This works only if usetags is true.");
1385     System.err.println();
1386     System.err.println(" Note: -D properties will be applied to the conf used. ");
1387     System.err.println("  For example: ");
1388     System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
1389     System.err.println("   -Dmapreduce.task.timeout=60000");
1390     System.err.println();
1391     System.err.println("Command:");
1392     for (CmdDescriptor command : commands.values()) {
1393       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1394     }
1395     System.err.println();
1396     System.err.println("Args:");
1397     System.err.println(" nclients      Integer. Required. Total number of " +
1398       "clients (and HRegionServers)");
1399     System.err.println("               running: 1 <= value <= 500");
1400     System.err.println("Examples:");
1401     System.err.println(" To run a single evaluation client:");
1402     System.err.println(" $ bin/hbase " + this.getClass().getName()
1403         + " sequentialWrite 1");
1404   }
1405 
getArgs(final int start, final String[] args)1406   private void getArgs(final int start, final String[] args) {
1407     if(start + 1 > args.length) {
1408       throw new IllegalArgumentException("must supply the number of clients");
1409     }
1410     N = Integer.parseInt(args[start]);
1411     if (N < 1) {
1412       throw new IllegalArgumentException("Number of clients must be > 1");
1413     }
1414     // Set total number of rows to write.
1415     R = R * N;
1416   }
1417 
1418   @Override
run(String[] args)1419   public int run(String[] args) throws Exception {
1420     // Process command-line args. TODO: Better cmd-line processing
1421     // (but hopefully something not as painful as cli options).
1422     int errCode = -1;
1423     if (args.length < 1) {
1424       printUsage();
1425       return errCode;
1426     }
1427 
1428     try {
1429       for (int i = 0; i < args.length; i++) {
1430         String cmd = args[i];
1431         if (cmd.equals("-h") || cmd.startsWith("--h")) {
1432           printUsage();
1433           errCode = 0;
1434           break;
1435         }
1436 
1437         final String nmr = "--nomapred";
1438         if (cmd.startsWith(nmr)) {
1439           nomapred = true;
1440           continue;
1441         }
1442 
1443         final String rows = "--rows=";
1444         if (cmd.startsWith(rows)) {
1445           R = Integer.parseInt(cmd.substring(rows.length()));
1446           continue;
1447         }
1448 
1449         final String table = "--table=";
1450         if (cmd.startsWith(table)) {
1451           this.tableName = TableName.valueOf(cmd.substring(table.length()));
1452           continue;
1453         }
1454 
1455         final String compress = "--compress=";
1456         if (cmd.startsWith(compress)) {
1457           this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1458           continue;
1459         }
1460 
1461         final String blockEncoding = "--blockEncoding=";
1462         if (cmd.startsWith(blockEncoding)) {
1463           this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1464           continue;
1465         }
1466 
1467         final String flushCommits = "--flushCommits=";
1468         if (cmd.startsWith(flushCommits)) {
1469           this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1470           continue;
1471         }
1472 
1473         final String writeToWAL = "--writeToWAL=";
1474         if (cmd.startsWith(writeToWAL)) {
1475           this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1476           continue;
1477         }
1478 
1479         final String presplit = "--presplit=";
1480         if (cmd.startsWith(presplit)) {
1481           this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1482           continue;
1483         }
1484 
1485         final String inMemory = "--inmemory=";
1486         if (cmd.startsWith(inMemory)) {
1487           this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1488           continue;
1489         }
1490 
1491         this.connection = ConnectionFactory.createConnection(getConf());
1492 
1493         final String useTags = "--usetags=";
1494         if (cmd.startsWith(useTags)) {
1495           this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1496           continue;
1497         }
1498 
1499         final String noOfTags = "--nooftags=";
1500         if (cmd.startsWith(noOfTags)) {
1501           this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1502           continue;
1503         }
1504 
1505         final String host = "--host=";
1506         if (cmd.startsWith(host)) {
1507           cluster.add(cmd.substring(host.length()));
1508           continue;
1509         }
1510 
1511         Class<? extends Test> cmdClass = determineCommandClass(cmd);
1512         if (cmdClass != null) {
1513           getArgs(i + 1, args);
1514           if (cluster.isEmpty()) {
1515             String s = conf.get("stargate.hostname", "localhost");
1516             if (s.contains(":")) {
1517               cluster.add(s);
1518             } else {
1519               cluster.add(s, conf.getInt("stargate.port", 8080));
1520             }
1521           }
1522           runTest(cmdClass);
1523           errCode = 0;
1524           break;
1525         }
1526 
1527         printUsage();
1528         break;
1529       }
1530     } catch (Exception e) {
1531       LOG.error("Failed", e);
1532     }
1533 
1534     return errCode;
1535   }
1536 
determineCommandClass(String cmd)1537   private Class<? extends Test> determineCommandClass(String cmd) {
1538     CmdDescriptor descriptor = commands.get(cmd);
1539     return descriptor != null ? descriptor.getCmdClass() : null;
1540   }
1541 
1542   /**
1543    * @param args
1544    */
main(final String[] args)1545   public static void main(final String[] args) throws Exception {
1546     int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
1547     System.exit(res);
1548   }
1549 }
1550