1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs; 20 21 import java.io.BufferedReader; 22 import java.io.Closeable; 23 import java.io.DataInputStream; 24 import java.io.File; 25 import java.io.FileOutputStream; 26 import java.io.IOException; 27 import java.io.InputStream; 28 import java.io.InputStreamReader; 29 import java.io.OutputStream; 30 import java.io.PrintStream; 31 import java.util.Date; 32 import java.util.Random; 33 import java.util.StringTokenizer; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.hdfs.DFSConfigKeys; 38 import org.apache.hadoop.hdfs.MiniDFSCluster; 39 import org.apache.hadoop.io.LongWritable; 40 import org.apache.hadoop.io.SequenceFile; 41 import org.apache.hadoop.io.SequenceFile.CompressionType; 42 import org.apache.hadoop.io.Text; 43 import org.apache.hadoop.io.compress.CompressionCodec; 44 import org.apache.hadoop.mapred.FileInputFormat; 45 import org.apache.hadoop.mapred.FileOutputFormat; 46 import org.apache.hadoop.mapred.JobClient; 47 import org.apache.hadoop.mapred.JobConf; 48 import org.apache.hadoop.mapred.Mapper; 49 import org.apache.hadoop.mapred.OutputCollector; 50 import org.apache.hadoop.mapred.Reporter; 51 import org.apache.hadoop.mapred.SequenceFileInputFormat; 52 import org.apache.hadoop.util.ReflectionUtils; 53 import org.apache.hadoop.util.StringUtils; 54 import org.apache.hadoop.util.Tool; 55 import org.apache.hadoop.util.ToolRunner; 56 import org.junit.AfterClass; 57 import org.junit.BeforeClass; 58 import org.junit.Test; 59 60 /** 61 * Distributed i/o benchmark. 62 * <p> 63 * This test writes into or reads from a specified number of files. 64 * Number of bytes to write or read is specified as a parameter to the test. 65 * Each file is accessed in a separate map task. 66 * <p> 67 * The reducer collects the following statistics: 68 * <ul> 69 * <li>number of tasks completed</li> 70 * <li>number of bytes written/read</li> 71 * <li>execution time</li> 72 * <li>io rate</li> 73 * <li>io rate squared</li> 74 * </ul> 75 * 76 * Finally, the following information is appended to a local file 77 * <ul> 78 * <li>read or write test</li> 79 * <li>date and time the test finished</li> 80 * <li>number of files</li> 81 * <li>total number of bytes processed</li> 82 * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li> 83 * <li>average i/o rate in mb/sec per file</li> 84 * <li>standard deviation of i/o rate </li> 85 * </ul> 86 */ 87 public class TestDFSIO implements Tool { 88 // Constants 89 private static final Log LOG = LogFactory.getLog(TestDFSIO.class); 90 private static final int DEFAULT_BUFFER_SIZE = 1000000; 91 private static final String BASE_FILE_NAME = "test_io_"; 92 private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; 93 private static final long MEGA = ByteMultiple.MB.value(); 94 private static final int DEFAULT_NR_BYTES = 128; 95 private static final int DEFAULT_NR_FILES = 4; 96 private static final String USAGE = 97 "Usage: " + TestDFSIO.class.getSimpleName() + 98 " [genericOptions]" + 99 " -read [-random | -backward | -skip [-skipSize Size]] |" + 100 " -write | -append | -truncate | -clean" + 101 " [-compression codecClassName]" + 102 " [-nrFiles N]" + 103 " [-size Size[B|KB|MB|GB|TB]]" + 104 " [-resFile resultFileName] [-bufferSize Bytes]" + 105 " [-rootDir]"; 106 107 private Configuration config; 108 109 static{ 110 Configuration.addDefaultResource("hdfs-default.xml"); 111 Configuration.addDefaultResource("hdfs-site.xml"); 112 Configuration.addDefaultResource("mapred-default.xml"); 113 Configuration.addDefaultResource("mapred-site.xml"); 114 } 115 116 private static enum TestType { 117 TEST_TYPE_READ("read"), 118 TEST_TYPE_WRITE("write"), 119 TEST_TYPE_CLEANUP("cleanup"), 120 TEST_TYPE_APPEND("append"), 121 TEST_TYPE_READ_RANDOM("random read"), 122 TEST_TYPE_READ_BACKWARD("backward read"), 123 TEST_TYPE_READ_SKIP("skip read"), 124 TEST_TYPE_TRUNCATE("truncate"); 125 126 private String type; 127 TestType(String t)128 private TestType(String t) { 129 type = t; 130 } 131 132 @Override // String toString()133 public String toString() { 134 return type; 135 } 136 } 137 138 static enum ByteMultiple { 139 B(1L), 140 KB(0x400L), 141 MB(0x100000L), 142 GB(0x40000000L), 143 TB(0x10000000000L); 144 145 private long multiplier; 146 ByteMultiple(long mult)147 private ByteMultiple(long mult) { 148 multiplier = mult; 149 } 150 value()151 long value() { 152 return multiplier; 153 } 154 parseString(String sMultiple)155 static ByteMultiple parseString(String sMultiple) { 156 if(sMultiple == null || sMultiple.isEmpty()) // MB by default 157 return MB; 158 String sMU = StringUtils.toUpperCase(sMultiple); 159 if(StringUtils.toUpperCase(B.name()).endsWith(sMU)) 160 return B; 161 if(StringUtils.toUpperCase(KB.name()).endsWith(sMU)) 162 return KB; 163 if(StringUtils.toUpperCase(MB.name()).endsWith(sMU)) 164 return MB; 165 if(StringUtils.toUpperCase(GB.name()).endsWith(sMU)) 166 return GB; 167 if(StringUtils.toUpperCase(TB.name()).endsWith(sMU)) 168 return TB; 169 throw new IllegalArgumentException("Unsupported ByteMultiple "+sMultiple); 170 } 171 } 172 TestDFSIO()173 public TestDFSIO() { 174 this.config = new Configuration(); 175 } 176 getBaseDir(Configuration conf)177 private static String getBaseDir(Configuration conf) { 178 return conf.get("test.build.data","/benchmarks/TestDFSIO"); 179 } getControlDir(Configuration conf)180 private static Path getControlDir(Configuration conf) { 181 return new Path(getBaseDir(conf), "io_control"); 182 } getWriteDir(Configuration conf)183 private static Path getWriteDir(Configuration conf) { 184 return new Path(getBaseDir(conf), "io_write"); 185 } getReadDir(Configuration conf)186 private static Path getReadDir(Configuration conf) { 187 return new Path(getBaseDir(conf), "io_read"); 188 } getAppendDir(Configuration conf)189 private static Path getAppendDir(Configuration conf) { 190 return new Path(getBaseDir(conf), "io_append"); 191 } getRandomReadDir(Configuration conf)192 private static Path getRandomReadDir(Configuration conf) { 193 return new Path(getBaseDir(conf), "io_random_read"); 194 } getTruncateDir(Configuration conf)195 private static Path getTruncateDir(Configuration conf) { 196 return new Path(getBaseDir(conf), "io_truncate"); 197 } getDataDir(Configuration conf)198 private static Path getDataDir(Configuration conf) { 199 return new Path(getBaseDir(conf), "io_data"); 200 } 201 202 private static MiniDFSCluster cluster; 203 private static TestDFSIO bench; 204 205 @BeforeClass beforeClass()206 public static void beforeClass() throws Exception { 207 bench = new TestDFSIO(); 208 bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); 209 bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); 210 cluster = new MiniDFSCluster.Builder(bench.getConf()) 211 .numDataNodes(2) 212 .format(true) 213 .build(); 214 FileSystem fs = cluster.getFileSystem(); 215 bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES); 216 217 /** Check write here, as it is required for other tests */ 218 testWrite(); 219 } 220 221 @AfterClass afterClass()222 public static void afterClass() throws Exception { 223 if(cluster == null) 224 return; 225 FileSystem fs = cluster.getFileSystem(); 226 bench.cleanup(fs); 227 cluster.shutdown(); 228 } 229 testWrite()230 public static void testWrite() throws Exception { 231 FileSystem fs = cluster.getFileSystem(); 232 long tStart = System.currentTimeMillis(); 233 bench.writeTest(fs); 234 long execTime = System.currentTimeMillis() - tStart; 235 bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime); 236 } 237 238 @Test (timeout = 3000) testRead()239 public void testRead() throws Exception { 240 FileSystem fs = cluster.getFileSystem(); 241 long tStart = System.currentTimeMillis(); 242 bench.readTest(fs); 243 long execTime = System.currentTimeMillis() - tStart; 244 bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime); 245 } 246 247 @Test (timeout = 3000) testReadRandom()248 public void testReadRandom() throws Exception { 249 FileSystem fs = cluster.getFileSystem(); 250 long tStart = System.currentTimeMillis(); 251 bench.getConf().setLong("test.io.skip.size", 0); 252 bench.randomReadTest(fs); 253 long execTime = System.currentTimeMillis() - tStart; 254 bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime); 255 } 256 257 @Test (timeout = 3000) testReadBackward()258 public void testReadBackward() throws Exception { 259 FileSystem fs = cluster.getFileSystem(); 260 long tStart = System.currentTimeMillis(); 261 bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE); 262 bench.randomReadTest(fs); 263 long execTime = System.currentTimeMillis() - tStart; 264 bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime); 265 } 266 267 @Test (timeout = 3000) testReadSkip()268 public void testReadSkip() throws Exception { 269 FileSystem fs = cluster.getFileSystem(); 270 long tStart = System.currentTimeMillis(); 271 bench.getConf().setLong("test.io.skip.size", 1); 272 bench.randomReadTest(fs); 273 long execTime = System.currentTimeMillis() - tStart; 274 bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime); 275 } 276 277 @Test (timeout = 6000) testAppend()278 public void testAppend() throws Exception { 279 FileSystem fs = cluster.getFileSystem(); 280 long tStart = System.currentTimeMillis(); 281 bench.appendTest(fs); 282 long execTime = System.currentTimeMillis() - tStart; 283 bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime); 284 } 285 286 @Test (timeout = 60000) testTruncate()287 public void testTruncate() throws Exception { 288 FileSystem fs = cluster.getFileSystem(); 289 bench.createControlFile(fs, DEFAULT_NR_BYTES / 2, DEFAULT_NR_FILES); 290 long tStart = System.currentTimeMillis(); 291 bench.truncateTest(fs); 292 long execTime = System.currentTimeMillis() - tStart; 293 bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime); 294 } 295 296 @SuppressWarnings("deprecation") createControlFile(FileSystem fs, long nrBytes, int nrFiles )297 private void createControlFile(FileSystem fs, 298 long nrBytes, // in bytes 299 int nrFiles 300 ) throws IOException { 301 LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files"); 302 303 Path controlDir = getControlDir(config); 304 fs.delete(controlDir, true); 305 306 for(int i=0; i < nrFiles; i++) { 307 String name = getFileName(i); 308 Path controlFile = new Path(controlDir, "in_file_" + name); 309 SequenceFile.Writer writer = null; 310 try { 311 writer = SequenceFile.createWriter(fs, config, controlFile, 312 Text.class, LongWritable.class, 313 CompressionType.NONE); 314 writer.append(new Text(name), new LongWritable(nrBytes)); 315 } catch(Exception e) { 316 throw new IOException(e.getLocalizedMessage()); 317 } finally { 318 if (writer != null) 319 writer.close(); 320 writer = null; 321 } 322 } 323 LOG.info("created control files for: "+nrFiles+" files"); 324 } 325 getFileName(int fIdx)326 private static String getFileName(int fIdx) { 327 return BASE_FILE_NAME + Integer.toString(fIdx); 328 } 329 330 /** 331 * Write/Read mapper base class. 332 * <p> 333 * Collects the following statistics per task: 334 * <ul> 335 * <li>number of tasks completed</li> 336 * <li>number of bytes written/read</li> 337 * <li>execution time</li> 338 * <li>i/o rate</li> 339 * <li>i/o rate squared</li> 340 * </ul> 341 */ 342 private abstract static class IOStatMapper extends IOMapperBase<Long> { 343 protected CompressionCodec compressionCodec; 344 IOStatMapper()345 IOStatMapper() { 346 } 347 348 @Override // Mapper configure(JobConf conf)349 public void configure(JobConf conf) { 350 super.configure(conf); 351 352 // grab compression 353 String compression = getConf().get("test.io.compression.class", null); 354 Class<? extends CompressionCodec> codec; 355 356 // try to initialize codec 357 try { 358 codec = (compression == null) ? null : 359 Class.forName(compression).asSubclass(CompressionCodec.class); 360 } catch(Exception e) { 361 throw new RuntimeException("Compression codec not found: ", e); 362 } 363 364 if(codec != null) { 365 compressionCodec = (CompressionCodec) 366 ReflectionUtils.newInstance(codec, getConf()); 367 } 368 } 369 370 @Override // IOMapperBase collectStats(OutputCollector<Text, Text> output, String name, long execTime, Long objSize)371 void collectStats(OutputCollector<Text, Text> output, 372 String name, 373 long execTime, 374 Long objSize) throws IOException { 375 long totalSize = objSize.longValue(); 376 float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); 377 LOG.info("Number of bytes processed = " + totalSize); 378 LOG.info("Exec time = " + execTime); 379 LOG.info("IO rate = " + ioRateMbSec); 380 381 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), 382 new Text(String.valueOf(1))); 383 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), 384 new Text(String.valueOf(totalSize))); 385 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), 386 new Text(String.valueOf(execTime))); 387 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), 388 new Text(String.valueOf(ioRateMbSec*1000))); 389 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), 390 new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); 391 } 392 } 393 394 /** 395 * Write mapper class. 396 */ 397 public static class WriteMapper extends IOStatMapper { 398 WriteMapper()399 public WriteMapper() { 400 for(int i=0; i < bufferSize; i++) 401 buffer[i] = (byte)('0' + i % 50); 402 } 403 404 @Override // IOMapperBase getIOStream(String name)405 public Closeable getIOStream(String name) throws IOException { 406 // create file 407 OutputStream out = 408 fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); 409 if(compressionCodec != null) 410 out = compressionCodec.createOutputStream(out); 411 LOG.info("out = " + out.getClass().getName()); 412 return out; 413 } 414 415 @Override // IOMapperBase doIO(Reporter reporter, String name, long totalSize )416 public Long doIO(Reporter reporter, 417 String name, 418 long totalSize // in bytes 419 ) throws IOException { 420 OutputStream out = (OutputStream)this.stream; 421 // write to the file 422 long nrRemaining; 423 for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { 424 int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 425 out.write(buffer, 0, curSize); 426 reporter.setStatus("writing " + name + "@" + 427 (totalSize - nrRemaining) + "/" + totalSize 428 + " ::host = " + hostName); 429 } 430 return Long.valueOf(totalSize); 431 } 432 } 433 writeTest(FileSystem fs)434 private void writeTest(FileSystem fs) throws IOException { 435 Path writeDir = getWriteDir(config); 436 fs.delete(getDataDir(config), true); 437 fs.delete(writeDir, true); 438 439 runIOTest(WriteMapper.class, writeDir); 440 } 441 runIOTest( Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Path outputDir)442 private void runIOTest( 443 Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 444 Path outputDir) throws IOException { 445 JobConf job = new JobConf(config, TestDFSIO.class); 446 447 FileInputFormat.setInputPaths(job, getControlDir(config)); 448 job.setInputFormat(SequenceFileInputFormat.class); 449 450 job.setMapperClass(mapperClass); 451 job.setReducerClass(AccumulatingReducer.class); 452 453 FileOutputFormat.setOutputPath(job, outputDir); 454 job.setOutputKeyClass(Text.class); 455 job.setOutputValueClass(Text.class); 456 job.setNumReduceTasks(1); 457 JobClient.runJob(job); 458 } 459 460 /** 461 * Append mapper class. 462 */ 463 public static class AppendMapper extends IOStatMapper { 464 AppendMapper()465 public AppendMapper() { 466 for(int i=0; i < bufferSize; i++) 467 buffer[i] = (byte)('0' + i % 50); 468 } 469 470 @Override // IOMapperBase getIOStream(String name)471 public Closeable getIOStream(String name) throws IOException { 472 // open file for append 473 OutputStream out = 474 fs.append(new Path(getDataDir(getConf()), name), bufferSize); 475 if(compressionCodec != null) 476 out = compressionCodec.createOutputStream(out); 477 LOG.info("out = " + out.getClass().getName()); 478 return out; 479 } 480 481 @Override // IOMapperBase doIO(Reporter reporter, String name, long totalSize )482 public Long doIO(Reporter reporter, 483 String name, 484 long totalSize // in bytes 485 ) throws IOException { 486 OutputStream out = (OutputStream)this.stream; 487 // write to the file 488 long nrRemaining; 489 for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { 490 int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 491 out.write(buffer, 0, curSize); 492 reporter.setStatus("writing " + name + "@" + 493 (totalSize - nrRemaining) + "/" + totalSize 494 + " ::host = " + hostName); 495 } 496 return Long.valueOf(totalSize); 497 } 498 } 499 appendTest(FileSystem fs)500 private void appendTest(FileSystem fs) throws IOException { 501 Path appendDir = getAppendDir(config); 502 fs.delete(appendDir, true); 503 runIOTest(AppendMapper.class, appendDir); 504 } 505 506 /** 507 * Read mapper class. 508 */ 509 public static class ReadMapper extends IOStatMapper { 510 ReadMapper()511 public ReadMapper() { 512 } 513 514 @Override // IOMapperBase getIOStream(String name)515 public Closeable getIOStream(String name) throws IOException { 516 // open file 517 InputStream in = fs.open(new Path(getDataDir(getConf()), name)); 518 if(compressionCodec != null) 519 in = compressionCodec.createInputStream(in); 520 LOG.info("in = " + in.getClass().getName()); 521 return in; 522 } 523 524 @Override // IOMapperBase doIO(Reporter reporter, String name, long totalSize )525 public Long doIO(Reporter reporter, 526 String name, 527 long totalSize // in bytes 528 ) throws IOException { 529 InputStream in = (InputStream)this.stream; 530 long actualSize = 0; 531 while (actualSize < totalSize) { 532 int curSize = in.read(buffer, 0, bufferSize); 533 if(curSize < 0) break; 534 actualSize += curSize; 535 reporter.setStatus("reading " + name + "@" + 536 actualSize + "/" + totalSize 537 + " ::host = " + hostName); 538 } 539 return Long.valueOf(actualSize); 540 } 541 } 542 readTest(FileSystem fs)543 private void readTest(FileSystem fs) throws IOException { 544 Path readDir = getReadDir(config); 545 fs.delete(readDir, true); 546 runIOTest(ReadMapper.class, readDir); 547 } 548 549 /** 550 * Mapper class for random reads. 551 * The mapper chooses a position in the file and reads bufferSize 552 * bytes starting at the chosen position. 553 * It stops after reading the totalSize bytes, specified by -size. 554 * 555 * There are three type of reads. 556 * 1) Random read always chooses a random position to read from: skipSize = 0 557 * 2) Backward read reads file in reverse order : skipSize < 0 558 * 3) Skip-read skips skipSize bytes after every read : skipSize > 0 559 */ 560 public static class RandomReadMapper extends IOStatMapper { 561 private Random rnd; 562 private long fileSize; 563 private long skipSize; 564 565 @Override // Mapper configure(JobConf conf)566 public void configure(JobConf conf) { 567 super.configure(conf); 568 skipSize = conf.getLong("test.io.skip.size", 0); 569 } 570 RandomReadMapper()571 public RandomReadMapper() { 572 rnd = new Random(); 573 } 574 575 @Override // IOMapperBase getIOStream(String name)576 public Closeable getIOStream(String name) throws IOException { 577 Path filePath = new Path(getDataDir(getConf()), name); 578 this.fileSize = fs.getFileStatus(filePath).getLen(); 579 InputStream in = fs.open(filePath); 580 if(compressionCodec != null) 581 in = new FSDataInputStream(compressionCodec.createInputStream(in)); 582 LOG.info("in = " + in.getClass().getName()); 583 LOG.info("skipSize = " + skipSize); 584 return in; 585 } 586 587 @Override // IOMapperBase doIO(Reporter reporter, String name, long totalSize )588 public Long doIO(Reporter reporter, 589 String name, 590 long totalSize // in bytes 591 ) throws IOException { 592 PositionedReadable in = (PositionedReadable)this.stream; 593 long actualSize = 0; 594 for(long pos = nextOffset(-1); 595 actualSize < totalSize; pos = nextOffset(pos)) { 596 int curSize = in.read(pos, buffer, 0, bufferSize); 597 if(curSize < 0) break; 598 actualSize += curSize; 599 reporter.setStatus("reading " + name + "@" + 600 actualSize + "/" + totalSize 601 + " ::host = " + hostName); 602 } 603 return Long.valueOf(actualSize); 604 } 605 606 /** 607 * Get next offset for reading. 608 * If current < 0 then choose initial offset according to the read type. 609 * 610 * @param current offset 611 * @return 612 */ nextOffset(long current)613 private long nextOffset(long current) { 614 if(skipSize == 0) 615 return rnd.nextInt((int)(fileSize)); 616 if(skipSize > 0) 617 return (current < 0) ? 0 : (current + bufferSize + skipSize); 618 // skipSize < 0 619 return (current < 0) ? Math.max(0, fileSize - bufferSize) : 620 Math.max(0, current + skipSize); 621 } 622 } 623 randomReadTest(FileSystem fs)624 private void randomReadTest(FileSystem fs) throws IOException { 625 Path readDir = getRandomReadDir(config); 626 fs.delete(readDir, true); 627 runIOTest(RandomReadMapper.class, readDir); 628 } 629 630 /** 631 * Truncate mapper class. 632 * The mapper truncates given file to the newLength, specified by -size. 633 */ 634 public static class TruncateMapper extends IOStatMapper { 635 private static final long DELAY = 100L; 636 637 private Path filePath; 638 private long fileSize; 639 640 @Override // IOMapperBase getIOStream(String name)641 public Closeable getIOStream(String name) throws IOException { 642 filePath = new Path(getDataDir(getConf()), name); 643 fileSize = fs.getFileStatus(filePath).getLen(); 644 return null; 645 } 646 647 @Override // IOMapperBase doIO(Reporter reporter, String name, long newLength )648 public Long doIO(Reporter reporter, 649 String name, 650 long newLength // in bytes 651 ) throws IOException { 652 boolean isClosed = fs.truncate(filePath, newLength); 653 reporter.setStatus("truncating " + name + " to newLength " + 654 newLength + " ::host = " + hostName); 655 for(int i = 0; !isClosed; i++) { 656 try { 657 Thread.sleep(DELAY); 658 } catch (InterruptedException ignored) {} 659 FileStatus status = fs.getFileStatus(filePath); 660 assert status != null : "status is null"; 661 isClosed = (status.getLen() == newLength); 662 reporter.setStatus("truncate recover for " + name + " to newLength " + 663 newLength + " attempt " + i + " ::host = " + hostName); 664 } 665 return Long.valueOf(fileSize - newLength); 666 } 667 } 668 truncateTest(FileSystem fs)669 private void truncateTest(FileSystem fs) throws IOException { 670 Path TruncateDir = getTruncateDir(config); 671 fs.delete(TruncateDir, true); 672 runIOTest(TruncateMapper.class, TruncateDir); 673 } 674 sequentialTest(FileSystem fs, TestType testType, long fileSize, int nrFiles )675 private void sequentialTest(FileSystem fs, 676 TestType testType, 677 long fileSize, // in bytes 678 int nrFiles 679 ) throws IOException { 680 IOStatMapper ioer = null; 681 switch(testType) { 682 case TEST_TYPE_READ: 683 ioer = new ReadMapper(); 684 break; 685 case TEST_TYPE_WRITE: 686 ioer = new WriteMapper(); 687 break; 688 case TEST_TYPE_APPEND: 689 ioer = new AppendMapper(); 690 break; 691 case TEST_TYPE_READ_RANDOM: 692 case TEST_TYPE_READ_BACKWARD: 693 case TEST_TYPE_READ_SKIP: 694 ioer = new RandomReadMapper(); 695 break; 696 case TEST_TYPE_TRUNCATE: 697 ioer = new TruncateMapper(); 698 break; 699 default: 700 return; 701 } 702 for(int i=0; i < nrFiles; i++) 703 ioer.doIO(Reporter.NULL, 704 BASE_FILE_NAME+Integer.toString(i), 705 fileSize); 706 } 707 main(String[] args)708 public static void main(String[] args) { 709 TestDFSIO bench = new TestDFSIO(); 710 int res = -1; 711 try { 712 res = ToolRunner.run(bench, args); 713 } catch(Exception e) { 714 System.err.print(StringUtils.stringifyException(e)); 715 res = -2; 716 } 717 if(res == -1) 718 System.err.print(USAGE); 719 System.exit(res); 720 } 721 722 @Override // Tool run(String[] args)723 public int run(String[] args) throws IOException { 724 TestType testType = null; 725 int bufferSize = DEFAULT_BUFFER_SIZE; 726 long nrBytes = 1*MEGA; 727 int nrFiles = 1; 728 long skipSize = 0; 729 String resFileName = DEFAULT_RES_FILE_NAME; 730 String compressionClass = null; 731 boolean isSequential = false; 732 String version = TestDFSIO.class.getSimpleName() + ".1.8"; 733 734 LOG.info(version); 735 if (args.length == 0) { 736 System.err.println("Missing arguments."); 737 return -1; 738 } 739 740 for (int i = 0; i < args.length; i++) { // parse command line 741 if (args[i].startsWith("-read")) { 742 testType = TestType.TEST_TYPE_READ; 743 } else if (args[i].equals("-write")) { 744 testType = TestType.TEST_TYPE_WRITE; 745 } else if (args[i].equals("-append")) { 746 testType = TestType.TEST_TYPE_APPEND; 747 } else if (args[i].equals("-random")) { 748 if(testType != TestType.TEST_TYPE_READ) return -1; 749 testType = TestType.TEST_TYPE_READ_RANDOM; 750 } else if (args[i].equals("-backward")) { 751 if(testType != TestType.TEST_TYPE_READ) return -1; 752 testType = TestType.TEST_TYPE_READ_BACKWARD; 753 } else if (args[i].equals("-skip")) { 754 if(testType != TestType.TEST_TYPE_READ) return -1; 755 testType = TestType.TEST_TYPE_READ_SKIP; 756 } else if (args[i].equalsIgnoreCase("-truncate")) { 757 testType = TestType.TEST_TYPE_TRUNCATE; 758 } else if (args[i].equals("-clean")) { 759 testType = TestType.TEST_TYPE_CLEANUP; 760 } else if (args[i].startsWith("-seq")) { 761 isSequential = true; 762 } else if (args[i].startsWith("-compression")) { 763 compressionClass = args[++i]; 764 } else if (args[i].equals("-nrFiles")) { 765 nrFiles = Integer.parseInt(args[++i]); 766 } else if (args[i].equals("-fileSize") || args[i].equals("-size")) { 767 nrBytes = parseSize(args[++i]); 768 } else if (args[i].equals("-skipSize")) { 769 skipSize = parseSize(args[++i]); 770 } else if (args[i].equals("-bufferSize")) { 771 bufferSize = Integer.parseInt(args[++i]); 772 } else if (args[i].equals("-resFile")) { 773 resFileName = args[++i]; 774 } else { 775 System.err.println("Illegal argument: " + args[i]); 776 return -1; 777 } 778 } 779 if(testType == null) 780 return -1; 781 if(testType == TestType.TEST_TYPE_READ_BACKWARD) 782 skipSize = -bufferSize; 783 else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) 784 skipSize = bufferSize; 785 786 LOG.info("nrFiles = " + nrFiles); 787 LOG.info("nrBytes (MB) = " + toMB(nrBytes)); 788 LOG.info("bufferSize = " + bufferSize); 789 if(skipSize > 0) 790 LOG.info("skipSize = " + skipSize); 791 LOG.info("baseDir = " + getBaseDir(config)); 792 793 if(compressionClass != null) { 794 config.set("test.io.compression.class", compressionClass); 795 LOG.info("compressionClass = " + compressionClass); 796 } 797 798 config.setInt("test.io.file.buffer.size", bufferSize); 799 config.setLong("test.io.skip.size", skipSize); 800 config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); 801 FileSystem fs = FileSystem.get(config); 802 803 if (isSequential) { 804 long tStart = System.currentTimeMillis(); 805 sequentialTest(fs, testType, nrBytes, nrFiles); 806 long execTime = System.currentTimeMillis() - tStart; 807 String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; 808 LOG.info(resultLine); 809 return 0; 810 } 811 if (testType == TestType.TEST_TYPE_CLEANUP) { 812 cleanup(fs); 813 return 0; 814 } 815 createControlFile(fs, nrBytes, nrFiles); 816 long tStart = System.currentTimeMillis(); 817 switch(testType) { 818 case TEST_TYPE_WRITE: 819 writeTest(fs); 820 break; 821 case TEST_TYPE_READ: 822 readTest(fs); 823 break; 824 case TEST_TYPE_APPEND: 825 appendTest(fs); 826 break; 827 case TEST_TYPE_READ_RANDOM: 828 case TEST_TYPE_READ_BACKWARD: 829 case TEST_TYPE_READ_SKIP: 830 randomReadTest(fs); 831 break; 832 case TEST_TYPE_TRUNCATE: 833 truncateTest(fs); 834 break; 835 default: 836 } 837 long execTime = System.currentTimeMillis() - tStart; 838 839 analyzeResult(fs, testType, execTime, resFileName); 840 return 0; 841 } 842 843 @Override // Configurable getConf()844 public Configuration getConf() { 845 return this.config; 846 } 847 848 @Override // Configurable setConf(Configuration conf)849 public void setConf(Configuration conf) { 850 this.config = conf; 851 } 852 853 /** 854 * Returns size in bytes. 855 * 856 * @param arg = {d}[B|KB|MB|GB|TB] 857 * @return 858 */ parseSize(String arg)859 static long parseSize(String arg) { 860 String[] args = arg.split("\\D", 2); // get digits 861 assert args.length <= 2; 862 long nrBytes = Long.parseLong(args[0]); 863 String bytesMult = arg.substring(args[0].length()); // get byte multiple 864 return nrBytes * ByteMultiple.parseString(bytesMult).value(); 865 } 866 toMB(long bytes)867 static float toMB(long bytes) { 868 return ((float)bytes)/MEGA; 869 } 870 analyzeResult( FileSystem fs, TestType testType, long execTime, String resFileName )871 private void analyzeResult( FileSystem fs, 872 TestType testType, 873 long execTime, 874 String resFileName 875 ) throws IOException { 876 Path reduceFile = getReduceFilePath(testType); 877 long tasks = 0; 878 long size = 0; 879 long time = 0; 880 float rate = 0; 881 float sqrate = 0; 882 DataInputStream in = null; 883 BufferedReader lines = null; 884 try { 885 in = new DataInputStream(fs.open(reduceFile)); 886 lines = new BufferedReader(new InputStreamReader(in)); 887 String line; 888 while((line = lines.readLine()) != null) { 889 StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); 890 String attr = tokens.nextToken(); 891 if (attr.endsWith(":tasks")) 892 tasks = Long.parseLong(tokens.nextToken()); 893 else if (attr.endsWith(":size")) 894 size = Long.parseLong(tokens.nextToken()); 895 else if (attr.endsWith(":time")) 896 time = Long.parseLong(tokens.nextToken()); 897 else if (attr.endsWith(":rate")) 898 rate = Float.parseFloat(tokens.nextToken()); 899 else if (attr.endsWith(":sqrate")) 900 sqrate = Float.parseFloat(tokens.nextToken()); 901 } 902 } finally { 903 if(in != null) in.close(); 904 if(lines != null) lines.close(); 905 } 906 907 double med = rate / 1000 / tasks; 908 double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); 909 String resultLines[] = { 910 "----- TestDFSIO ----- : " + testType, 911 " Date & time: " + new Date(System.currentTimeMillis()), 912 " Number of files: " + tasks, 913 "Total MBytes processed: " + toMB(size), 914 " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), 915 "Average IO rate mb/sec: " + med, 916 " IO rate std deviation: " + stdDev, 917 " Test exec time sec: " + (float)execTime / 1000, 918 "" }; 919 920 PrintStream res = null; 921 try { 922 res = new PrintStream(new FileOutputStream(new File(resFileName), true)); 923 for(int i = 0; i < resultLines.length; i++) { 924 LOG.info(resultLines[i]); 925 res.println(resultLines[i]); 926 } 927 } finally { 928 if(res != null) res.close(); 929 } 930 } 931 getReduceFilePath(TestType testType)932 private Path getReduceFilePath(TestType testType) { 933 switch(testType) { 934 case TEST_TYPE_WRITE: 935 return new Path(getWriteDir(config), "part-00000"); 936 case TEST_TYPE_APPEND: 937 return new Path(getAppendDir(config), "part-00000"); 938 case TEST_TYPE_READ: 939 return new Path(getReadDir(config), "part-00000"); 940 case TEST_TYPE_READ_RANDOM: 941 case TEST_TYPE_READ_BACKWARD: 942 case TEST_TYPE_READ_SKIP: 943 return new Path(getRandomReadDir(config), "part-00000"); 944 case TEST_TYPE_TRUNCATE: 945 return new Path(getTruncateDir(config), "part-00000"); 946 default: 947 } 948 return null; 949 } 950 analyzeResult(FileSystem fs, TestType testType, long execTime)951 private void analyzeResult(FileSystem fs, TestType testType, long execTime) 952 throws IOException { 953 String dir = System.getProperty("test.build.dir", "target/test-dir"); 954 analyzeResult(fs, testType, execTime, dir + "/" + DEFAULT_RES_FILE_NAME); 955 } 956 cleanup(FileSystem fs)957 private void cleanup(FileSystem fs) 958 throws IOException { 959 LOG.info("Cleaning up test files"); 960 fs.delete(new Path(getBaseDir(config)), true); 961 } 962 } 963