1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.rest; 20 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.conf.Configured; 25 import org.apache.hadoop.fs.FSDataInputStream; 26 import org.apache.hadoop.fs.FileStatus; 27 import org.apache.hadoop.fs.FileSystem; 28 import org.apache.hadoop.fs.Path; 29 import org.apache.hadoop.hbase.HBaseConfiguration; 30 import org.apache.hadoop.hbase.HColumnDescriptor; 31 import org.apache.hadoop.hbase.HConstants; 32 import org.apache.hadoop.hbase.HTableDescriptor; 33 import org.apache.hadoop.hbase.KeyValue; 34 import org.apache.hadoop.hbase.TableName; 35 import org.apache.hadoop.hbase.Tag; 36 import org.apache.hadoop.hbase.client.BufferedMutator; 37 import org.apache.hadoop.hbase.client.Connection; 38 import org.apache.hadoop.hbase.client.ConnectionFactory; 39 import org.apache.hadoop.hbase.client.Durability; 40 import org.apache.hadoop.hbase.client.Get; 41 import org.apache.hadoop.hbase.client.Put; 42 import org.apache.hadoop.hbase.client.Result; 43 import org.apache.hadoop.hbase.client.ResultScanner; 44 import org.apache.hadoop.hbase.client.Scan; 45 import org.apache.hadoop.hbase.client.Table; 46 import org.apache.hadoop.hbase.filter.BinaryComparator; 47 import org.apache.hadoop.hbase.filter.CompareFilter; 48 import org.apache.hadoop.hbase.filter.Filter; 49 import org.apache.hadoop.hbase.filter.PageFilter; 50 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 51 import org.apache.hadoop.hbase.filter.WhileMatchFilter; 52 import org.apache.hadoop.hbase.io.compress.Compression; 53 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 54 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 55 import org.apache.hadoop.hbase.rest.client.Client; 56 import org.apache.hadoop.hbase.rest.client.Cluster; 57 import org.apache.hadoop.hbase.rest.client.RemoteAdmin; 58 import org.apache.hadoop.hbase.util.Bytes; 59 import org.apache.hadoop.hbase.util.Hash; 60 import org.apache.hadoop.hbase.util.MurmurHash; 61 import org.apache.hadoop.hbase.util.Pair; 62 import org.apache.hadoop.io.LongWritable; 63 import org.apache.hadoop.io.NullWritable; 64 import org.apache.hadoop.io.Text; 65 import org.apache.hadoop.io.Writable; 66 import org.apache.hadoop.mapreduce.InputSplit; 67 import org.apache.hadoop.mapreduce.Job; 68 import org.apache.hadoop.mapreduce.JobContext; 69 import org.apache.hadoop.mapreduce.Mapper; 70 import org.apache.hadoop.mapreduce.RecordReader; 71 import org.apache.hadoop.mapreduce.TaskAttemptContext; 72 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 73 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 74 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 75 import org.apache.hadoop.util.LineReader; 76 import org.apache.hadoop.util.Tool; 77 import org.apache.hadoop.util.ToolRunner; 78 79 import java.io.DataInput; 80 import java.io.DataOutput; 81 import java.io.IOException; 82 import java.io.PrintStream; 83 import java.lang.reflect.Constructor; 84 import java.text.SimpleDateFormat; 85 import java.util.ArrayList; 86 import java.util.Arrays; 87 import java.util.Date; 88 import java.util.List; 89 import java.util.Map; 90 import java.util.Random; 91 import java.util.TreeMap; 92 import java.util.regex.Matcher; 93 import java.util.regex.Pattern; 94 95 /** 96 * Script used evaluating Stargate performance and scalability. Runs a SG 97 * client that steps through one of a set of hardcoded tests or 'experiments' 98 * (e.g. a random reads test, a random writes test, etc.). Pass on the 99 * command-line which test to run and how many clients are participating in 100 * this experiment. Run <code>java PerformanceEvaluation --help</code> to 101 * obtain usage. 102 * 103 * <p>This class sets up and runs the evaluation programs described in 104 * Section 7, <i>Performance Evaluation</i>, of the <a 105 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 106 * paper, pages 8-10. 107 * 108 * <p>If number of clients > 1, we start up a MapReduce job. Each map task 109 * runs an individual client. Each client does about 1GB of data. 110 */ 111 public class PerformanceEvaluation extends Configured implements Tool { 112 protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); 113 114 private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; 115 private static final int ROW_LENGTH = 1000; 116 private static final int TAG_LENGTH = 256; 117 private static final int ONE_GB = 1024 * 1024 * 1000; 118 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; 119 120 public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); 121 public static final byte [] FAMILY_NAME = Bytes.toBytes("info"); 122 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); 123 private TableName tableName = TABLE_NAME; 124 125 protected HTableDescriptor TABLE_DESCRIPTOR; 126 protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>(); 127 protected static Cluster cluster = new Cluster(); 128 129 volatile Configuration conf; 130 private boolean nomapred = false; 131 private int N = 1; 132 private int R = ROWS_PER_GB; 133 private Compression.Algorithm compression = Compression.Algorithm.NONE; 134 private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 135 private boolean flushCommits = true; 136 private boolean writeToWAL = true; 137 private boolean inMemoryCF = false; 138 private int presplitRegions = 0; 139 private boolean useTags = false; 140 private int noOfTags = 1; 141 private Connection connection; 142 143 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 144 /** 145 * Regex to parse lines in input file passed to mapreduce task. 146 */ 147 public static final Pattern LINE_PATTERN = 148 Pattern.compile("tableName=(\\w+),\\s+" + 149 "startRow=(\\d+),\\s+" + 150 "perClientRunRows=(\\d+),\\s+" + 151 "totalRows=(\\d+),\\s+" + 152 "clients=(\\d+),\\s+" + 153 "flushCommits=(\\w+),\\s+" + 154 "writeToWAL=(\\w+),\\s+" + 155 "useTags=(\\w+),\\s+" + 156 "noOfTags=(\\d+)"); 157 158 /** 159 * Enum for map metrics. Keep it out here rather than inside in the Map 160 * inner-class so we can find associated properties. 161 */ 162 protected static enum Counter { 163 /** elapsed time */ 164 ELAPSED_TIME, 165 /** number of rows */ 166 ROWS} 167 168 /** 169 * Constructor 170 * @param c Configuration object 171 */ PerformanceEvaluation(final Configuration c)172 public PerformanceEvaluation(final Configuration c) { 173 this.conf = c; 174 175 addCommandDescriptor(RandomReadTest.class, "randomRead", 176 "Run random read test"); 177 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 178 "Run random seek and scan 100 test"); 179 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 180 "Run random seek scan with both start and stop row (max 10 rows)"); 181 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 182 "Run random seek scan with both start and stop row (max 100 rows)"); 183 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 184 "Run random seek scan with both start and stop row (max 1000 rows)"); 185 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 186 "Run random seek scan with both start and stop row (max 10000 rows)"); 187 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 188 "Run random write test"); 189 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 190 "Run sequential read test"); 191 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 192 "Run sequential write test"); 193 addCommandDescriptor(ScanTest.class, "scan", 194 "Run scan test (read every row)"); 195 addCommandDescriptor(FilteredScanTest.class, "filterScan", 196 "Run scan test using a filter to find a specific row based " + 197 "on it's value (make sure to use --rows=20)"); 198 } 199 addCommandDescriptor(Class<? extends Test> cmdClass, String name, String description)200 protected void addCommandDescriptor(Class<? extends Test> cmdClass, 201 String name, String description) { 202 CmdDescriptor cmdDescriptor = 203 new CmdDescriptor(cmdClass, name, description); 204 commands.put(name, cmdDescriptor); 205 } 206 207 /** 208 * Implementations can have their status set. 209 */ 210 interface Status { 211 /** 212 * Sets status 213 * @param msg status message 214 * @throws IOException 215 */ setStatus(final String msg)216 void setStatus(final String msg) throws IOException; 217 } 218 219 /** 220 * This class works as the InputSplit of Performance Evaluation 221 * MapReduce InputFormat, and the Record Value of RecordReader. 222 * Each map task will only read one record from a PeInputSplit, 223 * the record value is the PeInputSplit itself. 224 */ 225 public static class PeInputSplit extends InputSplit implements Writable { 226 private TableName tableName = TABLE_NAME; 227 private int startRow = 0; 228 private int rows = 0; 229 private int totalRows = 0; 230 private int clients = 0; 231 private boolean flushCommits = false; 232 private boolean writeToWAL = true; 233 private boolean useTags = false; 234 private int noOfTags = 0; 235 PeInputSplit()236 public PeInputSplit() { 237 } 238 PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags)239 public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, 240 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { 241 this.tableName = tableName; 242 this.startRow = startRow; 243 this.rows = rows; 244 this.totalRows = totalRows; 245 this.clients = clients; 246 this.flushCommits = flushCommits; 247 this.writeToWAL = writeToWAL; 248 this.useTags = useTags; 249 this.noOfTags = noOfTags; 250 } 251 252 @Override readFields(DataInput in)253 public void readFields(DataInput in) throws IOException { 254 int tableNameLen = in.readInt(); 255 byte[] name = new byte[tableNameLen]; 256 in.readFully(name); 257 this.tableName = TableName.valueOf(name); 258 this.startRow = in.readInt(); 259 this.rows = in.readInt(); 260 this.totalRows = in.readInt(); 261 this.clients = in.readInt(); 262 this.flushCommits = in.readBoolean(); 263 this.writeToWAL = in.readBoolean(); 264 this.useTags = in.readBoolean(); 265 this.noOfTags = in.readInt(); 266 } 267 268 @Override write(DataOutput out)269 public void write(DataOutput out) throws IOException { 270 byte[] name = this.tableName.toBytes(); 271 out.writeInt(name.length); 272 out.write(name); 273 out.writeInt(startRow); 274 out.writeInt(rows); 275 out.writeInt(totalRows); 276 out.writeInt(clients); 277 out.writeBoolean(flushCommits); 278 out.writeBoolean(writeToWAL); 279 out.writeBoolean(useTags); 280 out.writeInt(noOfTags); 281 } 282 283 @Override getLength()284 public long getLength() throws IOException, InterruptedException { 285 return 0; 286 } 287 288 @Override getLocations()289 public String[] getLocations() throws IOException, InterruptedException { 290 return new String[0]; 291 } 292 getStartRow()293 public int getStartRow() { 294 return startRow; 295 } 296 getTableName()297 public TableName getTableName() { 298 return tableName; 299 } 300 getRows()301 public int getRows() { 302 return rows; 303 } 304 getTotalRows()305 public int getTotalRows() { 306 return totalRows; 307 } 308 getClients()309 public int getClients() { 310 return clients; 311 } 312 isFlushCommits()313 public boolean isFlushCommits() { 314 return flushCommits; 315 } 316 isWriteToWAL()317 public boolean isWriteToWAL() { 318 return writeToWAL; 319 } 320 isUseTags()321 public boolean isUseTags() { 322 return useTags; 323 } 324 getNoOfTags()325 public int getNoOfTags() { 326 return noOfTags; 327 } 328 } 329 330 /** 331 * InputFormat of Performance Evaluation MapReduce job. 332 * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 333 */ 334 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> { 335 336 @Override getSplits(JobContext job)337 public List<InputSplit> getSplits(JobContext job) throws IOException { 338 // generate splits 339 List<InputSplit> splitList = new ArrayList<InputSplit>(); 340 341 for (FileStatus file: listStatus(job)) { 342 if (file.isDirectory()) { 343 continue; 344 } 345 Path path = file.getPath(); 346 FileSystem fs = path.getFileSystem(job.getConfiguration()); 347 FSDataInputStream fileIn = fs.open(path); 348 LineReader in = new LineReader(fileIn, job.getConfiguration()); 349 int lineLen = 0; 350 while(true) { 351 Text lineText = new Text(); 352 lineLen = in.readLine(lineText); 353 if(lineLen <= 0) { 354 break; 355 } 356 Matcher m = LINE_PATTERN.matcher(lineText.toString()); 357 if((m != null) && m.matches()) { 358 TableName tableName = TableName.valueOf(m.group(1)); 359 int startRow = Integer.parseInt(m.group(2)); 360 int rows = Integer.parseInt(m.group(3)); 361 int totalRows = Integer.parseInt(m.group(4)); 362 int clients = Integer.parseInt(m.group(5)); 363 boolean flushCommits = Boolean.parseBoolean(m.group(6)); 364 boolean writeToWAL = Boolean.parseBoolean(m.group(7)); 365 boolean useTags = Boolean.parseBoolean(m.group(8)); 366 int noOfTags = Integer.parseInt(m.group(9)); 367 368 LOG.debug("tableName=" + tableName + 369 " split["+ splitList.size() + "] " + 370 " startRow=" + startRow + 371 " rows=" + rows + 372 " totalRows=" + totalRows + 373 " clients=" + clients + 374 " flushCommits=" + flushCommits + 375 " writeToWAL=" + writeToWAL + 376 " useTags=" + useTags + 377 " noOfTags=" + noOfTags); 378 379 PeInputSplit newSplit = 380 new PeInputSplit(tableName, startRow, rows, totalRows, clients, 381 flushCommits, writeToWAL, useTags, noOfTags); 382 splitList.add(newSplit); 383 } 384 } 385 in.close(); 386 } 387 388 LOG.info("Total # of splits: " + splitList.size()); 389 return splitList; 390 } 391 392 @Override createRecordReader(InputSplit split, TaskAttemptContext context)393 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split, 394 TaskAttemptContext context) { 395 return new PeRecordReader(); 396 } 397 398 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> { 399 private boolean readOver = false; 400 private PeInputSplit split = null; 401 private NullWritable key = null; 402 private PeInputSplit value = null; 403 404 @Override initialize(InputSplit split, TaskAttemptContext context)405 public void initialize(InputSplit split, TaskAttemptContext context) 406 throws IOException, InterruptedException { 407 this.readOver = false; 408 this.split = (PeInputSplit)split; 409 } 410 411 @Override nextKeyValue()412 public boolean nextKeyValue() throws IOException, InterruptedException { 413 if(readOver) { 414 return false; 415 } 416 417 key = NullWritable.get(); 418 value = (PeInputSplit)split; 419 420 readOver = true; 421 return true; 422 } 423 424 @Override getCurrentKey()425 public NullWritable getCurrentKey() throws IOException, InterruptedException { 426 return key; 427 } 428 429 @Override getCurrentValue()430 public PeInputSplit getCurrentValue() throws IOException, InterruptedException { 431 return value; 432 } 433 434 @Override getProgress()435 public float getProgress() throws IOException, InterruptedException { 436 if(readOver) { 437 return 1.0f; 438 } else { 439 return 0.0f; 440 } 441 } 442 443 @Override close()444 public void close() throws IOException { 445 // do nothing 446 } 447 } 448 } 449 450 /** 451 * MapReduce job that runs a performance evaluation client in each map task. 452 */ 453 public static class EvaluationMapTask 454 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> { 455 456 /** configuration parameter name that contains the command */ 457 public final static String CMD_KEY = "EvaluationMapTask.command"; 458 /** configuration parameter name that contains the PE impl */ 459 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 460 461 private Class<? extends Test> cmd; 462 private PerformanceEvaluation pe; 463 464 @Override setup(Context context)465 protected void setup(Context context) throws IOException, InterruptedException { 466 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 467 468 // this is required so that extensions of PE are instantiated within the 469 // map reduce task... 470 Class<? extends PerformanceEvaluation> peClass = 471 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 472 try { 473 this.pe = peClass.getConstructor(Configuration.class) 474 .newInstance(context.getConfiguration()); 475 } catch (Exception e) { 476 throw new IllegalStateException("Could not instantiate PE instance", e); 477 } 478 } 479 forName(String className, Class<Type> type)480 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 481 Class<? extends Type> clazz = null; 482 try { 483 clazz = Class.forName(className).asSubclass(type); 484 } catch (ClassNotFoundException e) { 485 throw new IllegalStateException("Could not find class for name: " + className, e); 486 } 487 return clazz; 488 } 489 map(NullWritable key, PeInputSplit value, final Context context)490 protected void map(NullWritable key, PeInputSplit value, final Context context) 491 throws IOException, InterruptedException { 492 493 Status status = new Status() { 494 public void setStatus(String msg) { 495 context.setStatus(msg); 496 } 497 }; 498 499 // Evaluation task 500 pe.tableName = value.getTableName(); 501 long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), 502 value.getRows(), value.getTotalRows(), 503 value.isFlushCommits(), value.isWriteToWAL(), 504 value.isUseTags(), value.getNoOfTags(), 505 ConnectionFactory.createConnection(context.getConfiguration()), status); 506 // Collect how much time the thing took. Report as map output and 507 // to the ELAPSED_TIME counter. 508 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); 509 context.getCounter(Counter.ROWS).increment(value.rows); 510 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); 511 context.progress(); 512 } 513 } 514 515 /* 516 * If table does not already exist, create. 517 * @param c Client to use checking. 518 * @return True if we created the table. 519 * @throws IOException 520 */ checkTable(RemoteAdmin admin)521 private boolean checkTable(RemoteAdmin admin) throws IOException { 522 HTableDescriptor tableDescriptor = getTableDescriptor(); 523 if (this.presplitRegions > 0) { 524 // presplit requested 525 if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { 526 admin.deleteTable(tableDescriptor.getTableName().getName()); 527 } 528 529 byte[][] splits = getSplits(); 530 for (int i=0; i < splits.length; i++) { 531 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 532 } 533 admin.createTable(tableDescriptor); 534 LOG.info ("Table created with " + this.presplitRegions + " splits"); 535 } else { 536 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 537 if (!tableExists) { 538 admin.createTable(tableDescriptor); 539 LOG.info("Table " + tableDescriptor + " created"); 540 } 541 } 542 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 543 return tableExists; 544 } 545 getTableDescriptor()546 protected HTableDescriptor getTableDescriptor() { 547 if (TABLE_DESCRIPTOR == null) { 548 TABLE_DESCRIPTOR = new HTableDescriptor(tableName); 549 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 550 family.setDataBlockEncoding(blockEncoding); 551 family.setCompressionType(compression); 552 if (inMemoryCF) { 553 family.setInMemory(true); 554 } 555 TABLE_DESCRIPTOR.addFamily(family); 556 } 557 return TABLE_DESCRIPTOR; 558 } 559 560 /** 561 * Generates splits based on total number of rows and specified split regions 562 * 563 * @return splits : array of byte [] 564 */ getSplits()565 protected byte[][] getSplits() { 566 if (this.presplitRegions == 0) 567 return new byte [0][]; 568 569 int numSplitPoints = presplitRegions - 1; 570 byte[][] splits = new byte[numSplitPoints][]; 571 int jump = this.R / this.presplitRegions; 572 for (int i=0; i < numSplitPoints; i++) { 573 int rowkey = jump * (1 + i); 574 splits[i] = format(rowkey); 575 } 576 return splits; 577 } 578 579 /* 580 * We're to run multiple clients concurrently. Setup a mapreduce job. Run 581 * one map per client. Then run a single reduce to sum the elapsed times. 582 * @param cmd Command to run. 583 * @throws IOException 584 */ runNIsMoreThanOne(final Class<? extends Test> cmd)585 private void runNIsMoreThanOne(final Class<? extends Test> cmd) 586 throws IOException, InterruptedException, ClassNotFoundException { 587 RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); 588 checkTable(remoteAdmin); 589 if (nomapred) { 590 doMultipleClients(cmd); 591 } else { 592 doMapReduce(cmd); 593 } 594 } 595 596 /* 597 * Run all clients in this vm each to its own thread. 598 * @param cmd Command to run. 599 * @throws IOException 600 */ doMultipleClients(final Class<? extends Test> cmd)601 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException { 602 final List<Thread> threads = new ArrayList<Thread>(this.N); 603 final long[] timings = new long[this.N]; 604 final int perClientRows = R/N; 605 final TableName tableName = this.tableName; 606 final DataBlockEncoding encoding = this.blockEncoding; 607 final boolean flushCommits = this.flushCommits; 608 final Compression.Algorithm compression = this.compression; 609 final boolean writeToWal = this.writeToWAL; 610 final int preSplitRegions = this.presplitRegions; 611 final boolean useTags = this.useTags; 612 final int numTags = this.noOfTags; 613 final Connection connection = ConnectionFactory.createConnection(getConf()); 614 for (int i = 0; i < this.N; i++) { 615 final int index = i; 616 Thread t = new Thread ("TestClient-" + i) { 617 @Override 618 public void run() { 619 super.run(); 620 PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); 621 pe.tableName = tableName; 622 pe.blockEncoding = encoding; 623 pe.flushCommits = flushCommits; 624 pe.compression = compression; 625 pe.writeToWAL = writeToWal; 626 pe.presplitRegions = preSplitRegions; 627 pe.N = N; 628 pe.connection = connection; 629 pe.useTags = useTags; 630 pe.noOfTags = numTags; 631 try { 632 long elapsedTime = pe.runOneClient(cmd, index * perClientRows, 633 perClientRows, R, 634 flushCommits, writeToWAL, useTags, noOfTags, connection, new Status() { 635 public void setStatus(final String msg) throws IOException { 636 LOG.info("client-" + getName() + " " + msg); 637 } 638 }); 639 timings[index] = elapsedTime; 640 LOG.info("Finished " + getName() + " in " + elapsedTime + 641 "ms writing " + perClientRows + " rows"); 642 } catch (IOException e) { 643 throw new RuntimeException(e); 644 } 645 } 646 }; 647 threads.add(t); 648 } 649 for (Thread t: threads) { 650 t.start(); 651 } 652 for (Thread t: threads) { 653 while(t.isAlive()) { 654 try { 655 t.join(); 656 } catch (InterruptedException e) { 657 LOG.debug("Interrupted, continuing" + e.toString()); 658 } 659 } 660 } 661 final String test = cmd.getSimpleName(); 662 LOG.info("[" + test + "] Summary of timings (ms): " 663 + Arrays.toString(timings)); 664 Arrays.sort(timings); 665 long total = 0; 666 for (int i = 0; i < this.N; i++) { 667 total += timings[i]; 668 } 669 LOG.info("[" + test + "]" 670 + "\tMin: " + timings[0] + "ms" 671 + "\tMax: " + timings[this.N - 1] + "ms" 672 + "\tAvg: " + (total / this.N) + "ms"); 673 } 674 675 /* 676 * Run a mapreduce job. Run as many maps as asked-for clients. 677 * Before we start up the job, write out an input file with instruction 678 * per client regards which row they are to start on. 679 * @param cmd Command to run. 680 * @throws IOException 681 */ doMapReduce(final Class<? extends Test> cmd)682 private void doMapReduce(final Class<? extends Test> cmd) throws IOException, 683 InterruptedException, ClassNotFoundException { 684 Configuration conf = getConf(); 685 Path inputDir = writeInputFile(conf); 686 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 687 conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); 688 Job job = Job.getInstance(conf); 689 job.setJarByClass(PerformanceEvaluation.class); 690 job.setJobName("HBase Performance Evaluation"); 691 692 job.setInputFormatClass(PeInputFormat.class); 693 PeInputFormat.setInputPaths(job, inputDir); 694 695 job.setOutputKeyClass(LongWritable.class); 696 job.setOutputValueClass(LongWritable.class); 697 698 job.setMapperClass(EvaluationMapTask.class); 699 job.setReducerClass(LongSumReducer.class); 700 job.setNumReduceTasks(1); 701 702 job.setOutputFormatClass(TextOutputFormat.class); 703 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 704 TableMapReduceUtil.addDependencyJars(job); 705 TableMapReduceUtil.initCredentials(job); 706 job.waitForCompletion(true); 707 } 708 709 /* 710 * Write input file of offsets-per-client for the mapreduce job. 711 * @param c Configuration 712 * @return Directory that contains file written. 713 * @throws IOException 714 */ writeInputFile(final Configuration c)715 private Path writeInputFile(final Configuration c) throws IOException { 716 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 717 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 718 Path inputDir = new Path(jobdir, "inputs"); 719 720 FileSystem fs = FileSystem.get(c); 721 fs.mkdirs(inputDir); 722 Path inputFile = new Path(inputDir, "input.txt"); 723 PrintStream out = new PrintStream(fs.create(inputFile)); 724 // Make input random. 725 Map<Integer, String> m = new TreeMap<Integer, String>(); 726 Hash h = MurmurHash.getInstance(); 727 int perClientRows = (this.R / this.N); 728 try { 729 for (int i = 0; i < 10; i++) { 730 for (int j = 0; j < N; j++) { 731 String s = "tableName=" + this.tableName + 732 ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + 733 ", perClientRunRows=" + (perClientRows / 10) + 734 ", totalRows=" + this.R + 735 ", clients=" + this.N + 736 ", flushCommits=" + this.flushCommits + 737 ", writeToWAL=" + this.writeToWAL + 738 ", useTags=" + this.useTags + 739 ", noOfTags=" + this.noOfTags; 740 int hash = h.hash(Bytes.toBytes(s)); 741 m.put(hash, s); 742 } 743 } 744 for (Map.Entry<Integer, String> e: m.entrySet()) { 745 out.println(e.getValue()); 746 } 747 } finally { 748 out.close(); 749 } 750 return inputDir; 751 } 752 753 /** 754 * Describes a command. 755 */ 756 static class CmdDescriptor { 757 private Class<? extends Test> cmdClass; 758 private String name; 759 private String description; 760 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description)761 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 762 this.cmdClass = cmdClass; 763 this.name = name; 764 this.description = description; 765 } 766 getCmdClass()767 public Class<? extends Test> getCmdClass() { 768 return cmdClass; 769 } 770 getName()771 public String getName() { 772 return name; 773 } 774 getDescription()775 public String getDescription() { 776 return description; 777 } 778 } 779 780 /** 781 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test 782 * tests}. This makes the reflection logic a little easier to understand... 783 */ 784 static class TestOptions { 785 private int startRow; 786 private int perClientRunRows; 787 private int totalRows; 788 private int numClientThreads; 789 private TableName tableName; 790 private boolean flushCommits; 791 private boolean writeToWAL = true; 792 private boolean useTags = false; 793 private int noOfTags = 0; 794 private Connection connection; 795 TestOptions()796 TestOptions() { 797 } 798 TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, Connection connection)799 TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, 800 TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, 801 int noOfTags, Connection connection) { 802 this.startRow = startRow; 803 this.perClientRunRows = perClientRunRows; 804 this.totalRows = totalRows; 805 this.numClientThreads = numClientThreads; 806 this.tableName = tableName; 807 this.flushCommits = flushCommits; 808 this.writeToWAL = writeToWAL; 809 this.useTags = useTags; 810 this.noOfTags = noOfTags; 811 this.connection = connection; 812 } 813 getStartRow()814 public int getStartRow() { 815 return startRow; 816 } 817 getPerClientRunRows()818 public int getPerClientRunRows() { 819 return perClientRunRows; 820 } 821 getTotalRows()822 public int getTotalRows() { 823 return totalRows; 824 } 825 getNumClientThreads()826 public int getNumClientThreads() { 827 return numClientThreads; 828 } 829 getTableName()830 public TableName getTableName() { 831 return tableName; 832 } 833 isFlushCommits()834 public boolean isFlushCommits() { 835 return flushCommits; 836 } 837 isWriteToWAL()838 public boolean isWriteToWAL() { 839 return writeToWAL; 840 } 841 getConnection()842 public Connection getConnection() { 843 return connection; 844 } 845 isUseTags()846 public boolean isUseTags() { 847 return this.useTags; 848 } 849 getNumTags()850 public int getNumTags() { 851 return this.noOfTags; 852 } 853 } 854 855 /* 856 * A test. 857 * Subclass to particularize what happens per row. 858 */ 859 static abstract class Test { 860 // Below is make it so when Tests are all running in the one 861 // jvm, that they each have a differently seeded Random. 862 private static final Random randomSeed = 863 new Random(System.currentTimeMillis()); nextRandomSeed()864 private static long nextRandomSeed() { 865 return randomSeed.nextLong(); 866 } 867 protected final Random rand = new Random(nextRandomSeed()); 868 869 protected final int startRow; 870 protected final int perClientRunRows; 871 protected final int totalRows; 872 private final Status status; 873 protected TableName tableName; 874 protected volatile Configuration conf; 875 protected boolean writeToWAL; 876 protected boolean useTags; 877 protected int noOfTags; 878 protected Connection connection; 879 880 /** 881 * Note that all subclasses of this class must provide a public contructor 882 * that has the exact same list of arguments. 883 */ Test(final Configuration conf, final TestOptions options, final Status status)884 Test(final Configuration conf, final TestOptions options, final Status status) { 885 super(); 886 this.startRow = options.getStartRow(); 887 this.perClientRunRows = options.getPerClientRunRows(); 888 this.totalRows = options.getTotalRows(); 889 this.status = status; 890 this.tableName = options.getTableName(); 891 this.conf = conf; 892 this.writeToWAL = options.isWriteToWAL(); 893 this.useTags = options.isUseTags(); 894 this.noOfTags = options.getNumTags(); 895 this.connection = options.getConnection(); 896 } 897 generateStatus(final int sr, final int i, final int lr)898 protected String generateStatus(final int sr, final int i, final int lr) { 899 return sr + "/" + i + "/" + lr; 900 } 901 getReportingPeriod()902 protected int getReportingPeriod() { 903 int period = this.perClientRunRows / 10; 904 return period == 0? this.perClientRunRows: period; 905 } 906 testTakedown()907 abstract void testTakedown() throws IOException; 908 /* 909 * Run test 910 * @return Elapsed time. 911 * @throws IOException 912 */ test()913 long test() throws IOException { 914 testSetup(); 915 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 916 final long startTime = System.nanoTime(); 917 try { 918 testTimed(); 919 } finally { 920 testTakedown(); 921 } 922 return (System.nanoTime() - startTime) / 1000000; 923 } 924 testSetup()925 abstract void testSetup() throws IOException; 926 927 /** 928 * Provides an extension point for tests that don't want a per row invocation. 929 */ testTimed()930 void testTimed() throws IOException { 931 int lastRow = this.startRow + this.perClientRunRows; 932 // Report on completion of 1/10th of total. 933 for (int i = this.startRow; i < lastRow; i++) { 934 testRow(i); 935 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 936 status.setStatus(generateStatus(this.startRow, i, lastRow)); 937 } 938 } 939 } 940 941 /* 942 * Test for individual row. 943 * @param i Row index. 944 */ testRow(final int i)945 abstract void testRow(final int i) throws IOException; 946 } 947 948 static abstract class TableTest extends Test { 949 protected Table table; 950 TableTest(Configuration conf, TestOptions options, Status status)951 public TableTest(Configuration conf, TestOptions options, Status status) { 952 super(conf, options, status); 953 } 954 testSetup()955 void testSetup() throws IOException { 956 this.table = connection.getTable(tableName); 957 } 958 959 @Override testTakedown()960 void testTakedown() throws IOException { 961 table.close(); 962 } 963 } 964 965 static abstract class BufferedMutatorTest extends Test { 966 protected BufferedMutator mutator; 967 protected boolean flushCommits; 968 BufferedMutatorTest(Configuration conf, TestOptions options, Status status)969 public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { 970 super(conf, options, status); 971 this.flushCommits = options.isFlushCommits(); 972 } 973 testSetup()974 void testSetup() throws IOException { 975 this.mutator = connection.getBufferedMutator(tableName); 976 } 977 testTakedown()978 void testTakedown() throws IOException { 979 if (flushCommits) { 980 this.mutator.flush(); 981 } 982 mutator.close(); 983 } 984 } 985 986 static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Configuration conf, TestOptions options, Status status)987 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { 988 super(conf, options, status); 989 } 990 991 @Override testRow(final int i)992 void testRow(final int i) throws IOException { 993 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); 994 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 995 scan.setFilter(new WhileMatchFilter(new PageFilter(120))); 996 ResultScanner s = this.table.getScanner(scan); 997 s.close(); 998 } 999 1000 @Override getReportingPeriod()1001 protected int getReportingPeriod() { 1002 int period = this.perClientRunRows / 100; 1003 return period == 0? this.perClientRunRows: period; 1004 } 1005 1006 } 1007 1008 @SuppressWarnings("unused") 1009 static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status)1010 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { 1011 super(conf, options, status); 1012 } 1013 1014 @Override testRow(final int i)1015 void testRow(final int i) throws IOException { 1016 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1017 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); 1018 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1019 ResultScanner s = this.table.getScanner(scan); 1020 int count = 0; 1021 for (Result rr = null; (rr = s.next()) != null;) { 1022 count++; 1023 } 1024 1025 if (i % 100 == 0) { 1026 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1027 Bytes.toString(startAndStopRow.getFirst()), 1028 Bytes.toString(startAndStopRow.getSecond()), count)); 1029 } 1030 1031 s.close(); 1032 } 1033 getStartAndStopRow()1034 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1035 generateStartAndStopRows(int maxRange)1036 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1037 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; 1038 int stop = start + maxRange; 1039 return new Pair<byte[],byte[]>(format(start), format(stop)); 1040 } 1041 1042 @Override getReportingPeriod()1043 protected int getReportingPeriod() { 1044 int period = this.perClientRunRows / 100; 1045 return period == 0? this.perClientRunRows: period; 1046 } 1047 } 1048 1049 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status)1050 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { 1051 super(conf, options, status); 1052 } 1053 1054 @Override getStartAndStopRow()1055 protected Pair<byte[], byte[]> getStartAndStopRow() { 1056 return generateStartAndStopRows(10); 1057 } 1058 } 1059 1060 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status)1061 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { 1062 super(conf, options, status); 1063 } 1064 1065 @Override getStartAndStopRow()1066 protected Pair<byte[], byte[]> getStartAndStopRow() { 1067 return generateStartAndStopRows(100); 1068 } 1069 } 1070 1071 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status)1072 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { 1073 super(conf, options, status); 1074 } 1075 1076 @Override getStartAndStopRow()1077 protected Pair<byte[], byte[]> getStartAndStopRow() { 1078 return generateStartAndStopRows(1000); 1079 } 1080 } 1081 1082 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status)1083 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { 1084 super(conf, options, status); 1085 } 1086 1087 @Override getStartAndStopRow()1088 protected Pair<byte[], byte[]> getStartAndStopRow() { 1089 return generateStartAndStopRows(10000); 1090 } 1091 } 1092 1093 static class RandomReadTest extends TableTest { RandomReadTest(Configuration conf, TestOptions options, Status status)1094 RandomReadTest(Configuration conf, TestOptions options, Status status) { 1095 super(conf, options, status); 1096 } 1097 1098 @Override testRow(final int i)1099 void testRow(final int i) throws IOException { 1100 Get get = new Get(getRandomRow(this.rand, this.totalRows)); 1101 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1102 this.table.get(get); 1103 } 1104 1105 @Override getReportingPeriod()1106 protected int getReportingPeriod() { 1107 int period = this.perClientRunRows / 100; 1108 return period == 0? this.perClientRunRows: period; 1109 } 1110 1111 } 1112 1113 static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Configuration conf, TestOptions options, Status status)1114 RandomWriteTest(Configuration conf, TestOptions options, Status status) { 1115 super(conf, options, status); 1116 } 1117 1118 @Override testRow(final int i)1119 void testRow(final int i) throws IOException { 1120 byte[] row = getRandomRow(this.rand, this.totalRows); 1121 Put put = new Put(row); 1122 byte[] value = generateData(this.rand, ROW_LENGTH); 1123 if (useTags) { 1124 byte[] tag = generateData(this.rand, TAG_LENGTH); 1125 Tag[] tags = new Tag[noOfTags]; 1126 for (int n = 0; n < noOfTags; n++) { 1127 Tag t = new Tag((byte) n, tag); 1128 tags[n] = t; 1129 } 1130 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1131 value, tags); 1132 put.add(kv); 1133 } else { 1134 put.add(FAMILY_NAME, QUALIFIER_NAME, value); 1135 } 1136 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1137 mutator.mutate(put); 1138 } 1139 } 1140 1141 static class ScanTest extends TableTest { 1142 private ResultScanner testScanner; 1143 ScanTest(Configuration conf, TestOptions options, Status status)1144 ScanTest(Configuration conf, TestOptions options, Status status) { 1145 super(conf, options, status); 1146 } 1147 1148 @Override testTakedown()1149 void testTakedown() throws IOException { 1150 if (this.testScanner != null) { 1151 this.testScanner.close(); 1152 } 1153 super.testTakedown(); 1154 } 1155 1156 1157 @Override testRow(final int i)1158 void testRow(final int i) throws IOException { 1159 if (this.testScanner == null) { 1160 Scan scan = new Scan(format(this.startRow)); 1161 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1162 this.testScanner = table.getScanner(scan); 1163 } 1164 testScanner.next(); 1165 } 1166 1167 } 1168 1169 static class SequentialReadTest extends TableTest { SequentialReadTest(Configuration conf, TestOptions options, Status status)1170 SequentialReadTest(Configuration conf, TestOptions options, Status status) { 1171 super(conf, options, status); 1172 } 1173 1174 @Override testRow(final int i)1175 void testRow(final int i) throws IOException { 1176 Get get = new Get(format(i)); 1177 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1178 table.get(get); 1179 } 1180 1181 } 1182 1183 static class SequentialWriteTest extends BufferedMutatorTest { 1184 SequentialWriteTest(Configuration conf, TestOptions options, Status status)1185 SequentialWriteTest(Configuration conf, TestOptions options, Status status) { 1186 super(conf, options, status); 1187 } 1188 1189 @Override testRow(final int i)1190 void testRow(final int i) throws IOException { 1191 byte[] row = format(i); 1192 Put put = new Put(row); 1193 byte[] value = generateData(this.rand, ROW_LENGTH); 1194 if (useTags) { 1195 byte[] tag = generateData(this.rand, TAG_LENGTH); 1196 Tag[] tags = new Tag[noOfTags]; 1197 for (int n = 0; n < noOfTags; n++) { 1198 Tag t = new Tag((byte) n, tag); 1199 tags[n] = t; 1200 } 1201 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1202 value, tags); 1203 put.add(kv); 1204 } else { 1205 put.add(FAMILY_NAME, QUALIFIER_NAME, value); 1206 } 1207 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1208 mutator.mutate(put); 1209 } 1210 } 1211 1212 static class FilteredScanTest extends TableTest { 1213 protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); 1214 FilteredScanTest(Configuration conf, TestOptions options, Status status)1215 FilteredScanTest(Configuration conf, TestOptions options, Status status) { 1216 super(conf, options, status); 1217 } 1218 1219 @Override testRow(int i)1220 void testRow(int i) throws IOException { 1221 byte[] value = generateValue(this.rand); 1222 Scan scan = constructScan(value); 1223 ResultScanner scanner = null; 1224 try { 1225 scanner = this.table.getScanner(scan); 1226 while (scanner.next() != null) { 1227 } 1228 } finally { 1229 if (scanner != null) scanner.close(); 1230 } 1231 } 1232 constructScan(byte[] valuePrefix)1233 protected Scan constructScan(byte[] valuePrefix) throws IOException { 1234 Filter filter = new SingleColumnValueFilter( 1235 FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL, 1236 new BinaryComparator(valuePrefix) 1237 ); 1238 Scan scan = new Scan(); 1239 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1240 scan.setFilter(filter); 1241 return scan; 1242 } 1243 } 1244 1245 /* 1246 * Format passed integer. 1247 * @param number 1248 * @return Returns zero-prefixed 10-byte wide decimal version of passed 1249 * number (Does absolute in case number is negative). 1250 */ format(final int number)1251 public static byte [] format(final int number) { 1252 byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; 1253 int d = Math.abs(number); 1254 for (int i = b.length - 1; i >= 0; i--) { 1255 b[i] = (byte)((d % 10) + '0'); 1256 d /= 10; 1257 } 1258 return b; 1259 } 1260 generateData(final Random r, int length)1261 public static byte[] generateData(final Random r, int length) { 1262 byte [] b = new byte [length]; 1263 int i = 0; 1264 1265 for(i = 0; i < (length-8); i += 8) { 1266 b[i] = (byte) (65 + r.nextInt(26)); 1267 b[i+1] = b[i]; 1268 b[i+2] = b[i]; 1269 b[i+3] = b[i]; 1270 b[i+4] = b[i]; 1271 b[i+5] = b[i]; 1272 b[i+6] = b[i]; 1273 b[i+7] = b[i]; 1274 } 1275 1276 byte a = (byte) (65 + r.nextInt(26)); 1277 for(; i < length; i++) { 1278 b[i] = a; 1279 } 1280 return b; 1281 } 1282 generateValue(final Random r)1283 public static byte[] generateValue(final Random r) { 1284 byte [] b = new byte [ROW_LENGTH]; 1285 r.nextBytes(b); 1286 return b; 1287 } 1288 getRandomRow(final Random random, final int totalRows)1289 static byte [] getRandomRow(final Random random, final int totalRows) { 1290 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1291 } 1292 runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, Connection connection, final Status status)1293 long runOneClient(final Class<? extends Test> cmd, final int startRow, 1294 final int perClientRunRows, final int totalRows, 1295 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 1296 Connection connection, final Status status) 1297 throws IOException { 1298 status.setStatus("Start " + cmd + " at offset " + startRow + " for " + 1299 perClientRunRows + " rows"); 1300 long totalElapsedTime = 0; 1301 1302 TestOptions options = new TestOptions(startRow, perClientRunRows, 1303 totalRows, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection); 1304 final Test t; 1305 try { 1306 Constructor<? extends Test> constructor = cmd.getDeclaredConstructor( 1307 Configuration.class, TestOptions.class, Status.class); 1308 t = constructor.newInstance(this.conf, options, status); 1309 } catch (NoSuchMethodException e) { 1310 throw new IllegalArgumentException("Invalid command class: " + 1311 cmd.getName() + ". It does not provide a constructor as described by" + 1312 "the javadoc comment. Available constructors are: " + 1313 Arrays.toString(cmd.getConstructors())); 1314 } catch (Exception e) { 1315 throw new IllegalStateException("Failed to construct command class", e); 1316 } 1317 totalElapsedTime = t.test(); 1318 1319 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 1320 "ms at offset " + startRow + " for " + perClientRunRows + " rows"); 1321 return totalElapsedTime; 1322 } 1323 runNIsOne(final Class<? extends Test> cmd)1324 private void runNIsOne(final Class<? extends Test> cmd) { 1325 Status status = new Status() { 1326 public void setStatus(String msg) throws IOException { 1327 LOG.info(msg); 1328 } 1329 }; 1330 1331 RemoteAdmin admin = null; 1332 try { 1333 Client client = new Client(cluster); 1334 admin = new RemoteAdmin(client, getConf()); 1335 checkTable(admin); 1336 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, 1337 this.useTags, this.noOfTags, this.connection, status); 1338 } catch (Exception e) { 1339 LOG.error("Failed", e); 1340 } 1341 } 1342 runTest(final Class<? extends Test> cmd)1343 private void runTest(final Class<? extends Test> cmd) throws IOException, 1344 InterruptedException, ClassNotFoundException { 1345 if (N == 1) { 1346 // If there is only one client and one HRegionServer, we assume nothing 1347 // has been set up at all. 1348 runNIsOne(cmd); 1349 } else { 1350 // Else, run 1351 runNIsMoreThanOne(cmd); 1352 } 1353 } 1354 printUsage()1355 protected void printUsage() { 1356 printUsage(null); 1357 } 1358 printUsage(final String message)1359 protected void printUsage(final String message) { 1360 if (message != null && message.length() > 0) { 1361 System.err.println(message); 1362 } 1363 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 1364 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); 1365 System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " + 1366 "[-D<property=value>]* <command> <nclients>"); 1367 System.err.println(); 1368 System.err.println("Options:"); 1369 System.err.println(" nomapred Run multiple clients using threads " + 1370 "(rather than use mapreduce)"); 1371 System.err.println(" rows Rows each client runs. Default: One million"); 1372 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1373 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1374 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 1375 "Default: false"); 1376 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1377 System.err.println(" presplit Create presplit table. Recommended for accurate perf " + 1378 "analysis (see guide). Default: disabled"); 1379 System.err.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as " + 1380 "possible. Not guaranteed that reads are always served from inmemory. Default: false"); 1381 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 1382 "Default : false"); 1383 System.err.println(" numoftags Specify the no of tags that would be needed. " + 1384 "This works only if usetags is true."); 1385 System.err.println(); 1386 System.err.println(" Note: -D properties will be applied to the conf used. "); 1387 System.err.println(" For example: "); 1388 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1389 System.err.println(" -Dmapreduce.task.timeout=60000"); 1390 System.err.println(); 1391 System.err.println("Command:"); 1392 for (CmdDescriptor command : commands.values()) { 1393 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1394 } 1395 System.err.println(); 1396 System.err.println("Args:"); 1397 System.err.println(" nclients Integer. Required. Total number of " + 1398 "clients (and HRegionServers)"); 1399 System.err.println(" running: 1 <= value <= 500"); 1400 System.err.println("Examples:"); 1401 System.err.println(" To run a single evaluation client:"); 1402 System.err.println(" $ bin/hbase " + this.getClass().getName() 1403 + " sequentialWrite 1"); 1404 } 1405 getArgs(final int start, final String[] args)1406 private void getArgs(final int start, final String[] args) { 1407 if(start + 1 > args.length) { 1408 throw new IllegalArgumentException("must supply the number of clients"); 1409 } 1410 N = Integer.parseInt(args[start]); 1411 if (N < 1) { 1412 throw new IllegalArgumentException("Number of clients must be > 1"); 1413 } 1414 // Set total number of rows to write. 1415 R = R * N; 1416 } 1417 1418 @Override run(String[] args)1419 public int run(String[] args) throws Exception { 1420 // Process command-line args. TODO: Better cmd-line processing 1421 // (but hopefully something not as painful as cli options). 1422 int errCode = -1; 1423 if (args.length < 1) { 1424 printUsage(); 1425 return errCode; 1426 } 1427 1428 try { 1429 for (int i = 0; i < args.length; i++) { 1430 String cmd = args[i]; 1431 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1432 printUsage(); 1433 errCode = 0; 1434 break; 1435 } 1436 1437 final String nmr = "--nomapred"; 1438 if (cmd.startsWith(nmr)) { 1439 nomapred = true; 1440 continue; 1441 } 1442 1443 final String rows = "--rows="; 1444 if (cmd.startsWith(rows)) { 1445 R = Integer.parseInt(cmd.substring(rows.length())); 1446 continue; 1447 } 1448 1449 final String table = "--table="; 1450 if (cmd.startsWith(table)) { 1451 this.tableName = TableName.valueOf(cmd.substring(table.length())); 1452 continue; 1453 } 1454 1455 final String compress = "--compress="; 1456 if (cmd.startsWith(compress)) { 1457 this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1458 continue; 1459 } 1460 1461 final String blockEncoding = "--blockEncoding="; 1462 if (cmd.startsWith(blockEncoding)) { 1463 this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1464 continue; 1465 } 1466 1467 final String flushCommits = "--flushCommits="; 1468 if (cmd.startsWith(flushCommits)) { 1469 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1470 continue; 1471 } 1472 1473 final String writeToWAL = "--writeToWAL="; 1474 if (cmd.startsWith(writeToWAL)) { 1475 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1476 continue; 1477 } 1478 1479 final String presplit = "--presplit="; 1480 if (cmd.startsWith(presplit)) { 1481 this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1482 continue; 1483 } 1484 1485 final String inMemory = "--inmemory="; 1486 if (cmd.startsWith(inMemory)) { 1487 this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1488 continue; 1489 } 1490 1491 this.connection = ConnectionFactory.createConnection(getConf()); 1492 1493 final String useTags = "--usetags="; 1494 if (cmd.startsWith(useTags)) { 1495 this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1496 continue; 1497 } 1498 1499 final String noOfTags = "--nooftags="; 1500 if (cmd.startsWith(noOfTags)) { 1501 this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1502 continue; 1503 } 1504 1505 final String host = "--host="; 1506 if (cmd.startsWith(host)) { 1507 cluster.add(cmd.substring(host.length())); 1508 continue; 1509 } 1510 1511 Class<? extends Test> cmdClass = determineCommandClass(cmd); 1512 if (cmdClass != null) { 1513 getArgs(i + 1, args); 1514 if (cluster.isEmpty()) { 1515 String s = conf.get("stargate.hostname", "localhost"); 1516 if (s.contains(":")) { 1517 cluster.add(s); 1518 } else { 1519 cluster.add(s, conf.getInt("stargate.port", 8080)); 1520 } 1521 } 1522 runTest(cmdClass); 1523 errCode = 0; 1524 break; 1525 } 1526 1527 printUsage(); 1528 break; 1529 } 1530 } catch (Exception e) { 1531 LOG.error("Failed", e); 1532 } 1533 1534 return errCode; 1535 } 1536 determineCommandClass(String cmd)1537 private Class<? extends Test> determineCommandClass(String cmd) { 1538 CmdDescriptor descriptor = commands.get(cmd); 1539 return descriptor != null ? descriptor.getCmdClass() : null; 1540 } 1541 1542 /** 1543 * @param args 1544 */ main(final String[] args)1545 public static void main(final String[] args) throws Exception { 1546 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 1547 System.exit(res); 1548 } 1549 } 1550