1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs; 20 21 import java.io.BufferedReader; 22 import java.io.DataInputStream; 23 import java.io.File; 24 import java.io.FileOutputStream; 25 import java.io.IOException; 26 import java.io.InputStreamReader; 27 import java.io.PrintStream; 28 import java.util.Date; 29 import java.util.StringTokenizer; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.conf.Configured; 35 import org.apache.hadoop.io.LongWritable; 36 import org.apache.hadoop.io.SequenceFile; 37 import org.apache.hadoop.io.Text; 38 import org.apache.hadoop.io.SequenceFile.CompressionType; 39 import org.apache.hadoop.mapred.*; 40 import org.apache.hadoop.util.Tool; 41 import org.apache.hadoop.util.ToolRunner; 42 import org.junit.Test; 43 44 /** 45 * Distributed i/o benchmark. 46 * <p> 47 * This test writes into or reads from a specified number of files. 48 * File size is specified as a parameter to the test. 49 * Each file is accessed in a separate map task. 50 * <p> 51 * The reducer collects the following statistics: 52 * <ul> 53 * <li>number of tasks completed</li> 54 * <li>number of bytes written/read</li> 55 * <li>execution time</li> 56 * <li>io rate</li> 57 * <li>io rate squared</li> 58 * </ul> 59 * 60 * Finally, the following information is appended to a local file 61 * <ul> 62 * <li>read or write test</li> 63 * <li>date and time the test finished</li> 64 * <li>number of files</li> 65 * <li>total number of bytes processed</li> 66 * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li> 67 * <li>average i/o rate in mb/sec per file</li> 68 * <li>standard i/o rate deviation</li> 69 * </ul> 70 */ 71 public class DFSCIOTest extends Configured implements Tool { 72 // Constants 73 private static final Log LOG = LogFactory.getLog(DFSCIOTest.class); 74 private static final int TEST_TYPE_READ = 0; 75 private static final int TEST_TYPE_WRITE = 1; 76 private static final int TEST_TYPE_CLEANUP = 2; 77 private static final int DEFAULT_BUFFER_SIZE = 1000000; 78 private static final String BASE_FILE_NAME = "test_io_"; 79 private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log"; 80 81 private static Configuration fsConfig = new Configuration(); 82 private static final long MEGA = 0x100000; 83 private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest"); 84 private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); 85 private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); 86 private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); 87 private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); 88 89 private static Path HDFS_TEST_DIR = new Path("/tmp/DFSCIOTest"); 90 private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1"); 91 private static String CHMOD = new String("chmod"); 92 private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION); 93 private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read"); 94 private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write"); 95 96 /** 97 * Run the test with default parameters. 98 * 99 * @throws Exception 100 */ 101 @Test testIOs()102 public void testIOs() throws Exception { 103 testIOs(10, 10); 104 } 105 106 /** 107 * Run the test with the specified parameters. 108 * 109 * @param fileSize file size 110 * @param nrFiles number of files 111 * @throws IOException 112 */ testIOs(int fileSize, int nrFiles)113 public static void testIOs(int fileSize, int nrFiles) 114 throws IOException { 115 116 FileSystem fs = FileSystem.get(fsConfig); 117 118 createControlFile(fs, fileSize, nrFiles); 119 writeTest(fs); 120 readTest(fs); 121 } 122 createControlFile( FileSystem fs, int fileSize, int nrFiles )123 private static void createControlFile( 124 FileSystem fs, 125 int fileSize, // in MB 126 int nrFiles 127 ) throws IOException { 128 LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); 129 130 fs.delete(CONTROL_DIR, true); 131 132 for(int i=0; i < nrFiles; i++) { 133 String name = getFileName(i); 134 Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); 135 SequenceFile.Writer writer = null; 136 try { 137 writer = SequenceFile.createWriter(fs, fsConfig, controlFile, 138 Text.class, LongWritable.class, 139 CompressionType.NONE); 140 writer.append(new Text(name), new LongWritable(fileSize)); 141 } catch(Exception e) { 142 throw new IOException(e.getLocalizedMessage()); 143 } finally { 144 if (writer != null) 145 writer.close(); 146 writer = null; 147 } 148 } 149 LOG.info("created control files for: "+nrFiles+" files"); 150 } 151 getFileName(int fIdx)152 private static String getFileName(int fIdx) { 153 return BASE_FILE_NAME + Integer.toString(fIdx); 154 } 155 156 /** 157 * Write/Read mapper base class. 158 * <p> 159 * Collects the following statistics per task: 160 * <ul> 161 * <li>number of tasks completed</li> 162 * <li>number of bytes written/read</li> 163 * <li>execution time</li> 164 * <li>i/o rate</li> 165 * <li>i/o rate squared</li> 166 * </ul> 167 */ 168 private abstract static class IOStatMapper extends IOMapperBase<Long> { IOStatMapper()169 IOStatMapper() { 170 } 171 collectStats(OutputCollector<Text, Text> output, String name, long execTime, Long objSize)172 void collectStats(OutputCollector<Text, Text> output, 173 String name, 174 long execTime, 175 Long objSize) throws IOException { 176 long totalSize = objSize.longValue(); 177 float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); 178 LOG.info("Number of bytes processed = " + totalSize); 179 LOG.info("Exec time = " + execTime); 180 LOG.info("IO rate = " + ioRateMbSec); 181 182 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), 183 new Text(String.valueOf(1))); 184 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), 185 new Text(String.valueOf(totalSize))); 186 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), 187 new Text(String.valueOf(execTime))); 188 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), 189 new Text(String.valueOf(ioRateMbSec*1000))); 190 output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), 191 new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); 192 } 193 } 194 195 /** 196 * Write mapper class. 197 */ 198 public static class WriteMapper extends IOStatMapper { 199 WriteMapper()200 public WriteMapper() { 201 super(); 202 for(int i=0; i < bufferSize; i++) 203 buffer[i] = (byte)('0' + i % 50); 204 } 205 doIO(Reporter reporter, String name, long totalSize )206 public Long doIO(Reporter reporter, 207 String name, 208 long totalSize 209 ) throws IOException { 210 // create file 211 totalSize *= MEGA; 212 213 // create instance of local filesystem 214 FileSystem localFS = FileSystem.getLocal(fsConfig); 215 216 try { 217 // native runtime 218 Runtime runTime = Runtime.getRuntime(); 219 220 // copy the dso and executable from dfs and chmod them 221 synchronized (this) { 222 localFS.delete(HDFS_TEST_DIR, true); 223 if (!(localFS.mkdirs(HDFS_TEST_DIR))) { 224 throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); 225 } 226 } 227 228 synchronized (this) { 229 if (!localFS.exists(HDFS_SHLIB)) { 230 FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig); 231 232 String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); 233 Process process = runTime.exec(chmodCmd); 234 int exitStatus = process.waitFor(); 235 if (exitStatus != 0) { 236 throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); 237 } 238 } 239 } 240 241 synchronized (this) { 242 if (!localFS.exists(HDFS_WRITE)) { 243 FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig); 244 245 String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE); 246 Process process = runTime.exec(chmodCmd); 247 int exitStatus = process.waitFor(); 248 if (exitStatus != 0) { 249 throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); 250 } 251 } 252 } 253 254 // exec the C program 255 Path outFile = new Path(DATA_DIR, name); 256 String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize); 257 Process process = runTime.exec(writeCmd, null, new File(HDFS_TEST_DIR.toString())); 258 int exitStatus = process.waitFor(); 259 if (exitStatus != 0) { 260 throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus); 261 } 262 } catch (InterruptedException interruptedException) { 263 reporter.setStatus(interruptedException.toString()); 264 } finally { 265 localFS.close(); 266 } 267 return new Long(totalSize); 268 } 269 } 270 writeTest(FileSystem fs)271 private static void writeTest(FileSystem fs) 272 throws IOException { 273 274 fs.delete(DATA_DIR, true); 275 fs.delete(WRITE_DIR, true); 276 277 runIOTest(WriteMapper.class, WRITE_DIR); 278 } 279 runIOTest( Class<? extends Mapper> mapperClass, Path outputDir )280 private static void runIOTest( Class<? extends Mapper> mapperClass, 281 Path outputDir 282 ) throws IOException { 283 JobConf job = new JobConf(fsConfig, DFSCIOTest.class); 284 285 FileInputFormat.setInputPaths(job, CONTROL_DIR); 286 job.setInputFormat(SequenceFileInputFormat.class); 287 288 job.setMapperClass(mapperClass); 289 job.setReducerClass(AccumulatingReducer.class); 290 291 FileOutputFormat.setOutputPath(job, outputDir); 292 job.setOutputKeyClass(Text.class); 293 job.setOutputValueClass(Text.class); 294 job.setNumReduceTasks(1); 295 JobClient.runJob(job); 296 } 297 298 /** 299 * Read mapper class. 300 */ 301 public static class ReadMapper extends IOStatMapper { 302 ReadMapper()303 public ReadMapper() { 304 super(); 305 } 306 doIO(Reporter reporter, String name, long totalSize )307 public Long doIO(Reporter reporter, 308 String name, 309 long totalSize 310 ) throws IOException { 311 totalSize *= MEGA; 312 313 // create instance of local filesystem 314 FileSystem localFS = FileSystem.getLocal(fsConfig); 315 316 try { 317 // native runtime 318 Runtime runTime = Runtime.getRuntime(); 319 320 // copy the dso and executable from dfs 321 synchronized (this) { 322 localFS.delete(HDFS_TEST_DIR, true); 323 if (!(localFS.mkdirs(HDFS_TEST_DIR))) { 324 throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); 325 } 326 } 327 328 synchronized (this) { 329 if (!localFS.exists(HDFS_SHLIB)) { 330 if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) { 331 throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem"); 332 } 333 334 String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); 335 Process process = runTime.exec(chmodCmd); 336 int exitStatus = process.waitFor(); 337 if (exitStatus != 0) { 338 throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); 339 } 340 } 341 } 342 343 synchronized (this) { 344 if (!localFS.exists(HDFS_READ)) { 345 if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) { 346 throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem"); 347 } 348 349 String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); 350 Process process = runTime.exec(chmodCmd); 351 int exitStatus = process.waitFor(); 352 353 if (exitStatus != 0) { 354 throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); 355 } 356 } 357 } 358 359 // exec the C program 360 Path inFile = new Path(DATA_DIR, name); 361 String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + 362 bufferSize); 363 Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString())); 364 int exitStatus = process.waitFor(); 365 366 if (exitStatus != 0) { 367 throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus); 368 } 369 } catch (InterruptedException interruptedException) { 370 reporter.setStatus(interruptedException.toString()); 371 } finally { 372 localFS.close(); 373 } 374 return new Long(totalSize); 375 } 376 } 377 readTest(FileSystem fs)378 private static void readTest(FileSystem fs) throws IOException { 379 fs.delete(READ_DIR, true); 380 runIOTest(ReadMapper.class, READ_DIR); 381 } 382 sequentialTest( FileSystem fs, int testType, int fileSize, int nrFiles )383 private static void sequentialTest( 384 FileSystem fs, 385 int testType, 386 int fileSize, 387 int nrFiles 388 ) throws Exception { 389 IOStatMapper ioer = null; 390 if (testType == TEST_TYPE_READ) 391 ioer = new ReadMapper(); 392 else if (testType == TEST_TYPE_WRITE) 393 ioer = new WriteMapper(); 394 else 395 return; 396 for(int i=0; i < nrFiles; i++) 397 ioer.doIO(Reporter.NULL, 398 BASE_FILE_NAME+Integer.toString(i), 399 MEGA*fileSize); 400 } 401 main(String[] args)402 public static void main(String[] args) throws Exception { 403 int res = ToolRunner.run(new TestDFSIO(), args); 404 System.exit(res); 405 } 406 analyzeResult( FileSystem fs, int testType, long execTime, String resFileName )407 private static void analyzeResult( FileSystem fs, 408 int testType, 409 long execTime, 410 String resFileName 411 ) throws IOException { 412 Path reduceFile; 413 if (testType == TEST_TYPE_WRITE) 414 reduceFile = new Path(WRITE_DIR, "part-00000"); 415 else 416 reduceFile = new Path(READ_DIR, "part-00000"); 417 DataInputStream in; 418 in = new DataInputStream(fs.open(reduceFile)); 419 420 BufferedReader lines; 421 lines = new BufferedReader(new InputStreamReader(in)); 422 long tasks = 0; 423 long size = 0; 424 long time = 0; 425 float rate = 0; 426 float sqrate = 0; 427 String line; 428 while((line = lines.readLine()) != null) { 429 StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); 430 String attr = tokens.nextToken(); 431 if (attr.endsWith(":tasks")) 432 tasks = Long.parseLong(tokens.nextToken()); 433 else if (attr.endsWith(":size")) 434 size = Long.parseLong(tokens. nextToken()); 435 else if (attr.endsWith(":time")) 436 time = Long.parseLong(tokens.nextToken()); 437 else if (attr.endsWith(":rate")) 438 rate = Float.parseFloat(tokens.nextToken()); 439 else if (attr.endsWith(":sqrate")) 440 sqrate = Float.parseFloat(tokens.nextToken()); 441 } 442 443 double med = rate / 1000 / tasks; 444 double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); 445 String resultLines[] = { 446 "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : 447 (testType == TEST_TYPE_READ) ? "read" : 448 "unknown"), 449 " Date & time: " + new Date(System.currentTimeMillis()), 450 " Number of files: " + tasks, 451 "Total MBytes processed: " + size/MEGA, 452 " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), 453 "Average IO rate mb/sec: " + med, 454 " Std IO rate deviation: " + stdDev, 455 " Test exec time sec: " + (float)execTime / 1000, 456 "" }; 457 458 PrintStream res = new PrintStream( 459 new FileOutputStream( 460 new File(resFileName), true)); 461 for(int i = 0; i < resultLines.length; i++) { 462 LOG.info(resultLines[i]); 463 res.println(resultLines[i]); 464 } 465 } 466 cleanup(FileSystem fs)467 private static void cleanup(FileSystem fs) throws Exception { 468 LOG.info("Cleaning up test files"); 469 fs.delete(new Path(TEST_ROOT_DIR), true); 470 fs.delete(HDFS_TEST_DIR, true); 471 } 472 473 @Override run(String[] args)474 public int run(String[] args) throws Exception { 475 int testType = TEST_TYPE_READ; 476 int bufferSize = DEFAULT_BUFFER_SIZE; 477 int fileSize = 1; 478 int nrFiles = 1; 479 String resFileName = DEFAULT_RES_FILE_NAME; 480 boolean isSequential = false; 481 482 String version="DFSCIOTest.0.0.1"; 483 String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; 484 485 System.out.println(version); 486 if (args.length == 0) { 487 System.err.println(usage); 488 System.exit(-1); 489 } 490 for (int i = 0; i < args.length; i++) { // parse command line 491 if (args[i].startsWith("-r")) { 492 testType = TEST_TYPE_READ; 493 } else if (args[i].startsWith("-w")) { 494 testType = TEST_TYPE_WRITE; 495 } else if (args[i].startsWith("-clean")) { 496 testType = TEST_TYPE_CLEANUP; 497 } else if (args[i].startsWith("-seq")) { 498 isSequential = true; 499 } else if (args[i].equals("-nrFiles")) { 500 nrFiles = Integer.parseInt(args[++i]); 501 } else if (args[i].equals("-fileSize")) { 502 fileSize = Integer.parseInt(args[++i]); 503 } else if (args[i].equals("-bufferSize")) { 504 bufferSize = Integer.parseInt(args[++i]); 505 } else if (args[i].equals("-resFile")) { 506 resFileName = args[++i]; 507 } 508 } 509 510 LOG.info("nrFiles = " + nrFiles); 511 LOG.info("fileSize (MB) = " + fileSize); 512 LOG.info("bufferSize = " + bufferSize); 513 514 try { 515 fsConfig.setInt("test.io.file.buffer.size", bufferSize); 516 FileSystem fs = FileSystem.get(fsConfig); 517 518 if (testType != TEST_TYPE_CLEANUP) { 519 fs.delete(HDFS_TEST_DIR, true); 520 if (!fs.mkdirs(HDFS_TEST_DIR)) { 521 throw new IOException("Mkdirs failed to create " + 522 HDFS_TEST_DIR.toString()); 523 } 524 525 //Copy the executables over to the remote filesystem 526 String hadoopHome = System.getenv("HADOOP_HOME"); 527 fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION), 528 HDFS_SHLIB); 529 fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ); 530 fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE); 531 } 532 533 if (isSequential) { 534 long tStart = System.currentTimeMillis(); 535 sequentialTest(fs, testType, fileSize, nrFiles); 536 long execTime = System.currentTimeMillis() - tStart; 537 String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; 538 LOG.info(resultLine); 539 return 0; 540 } 541 if (testType == TEST_TYPE_CLEANUP) { 542 cleanup(fs); 543 return 0; 544 } 545 createControlFile(fs, fileSize, nrFiles); 546 long tStart = System.currentTimeMillis(); 547 if (testType == TEST_TYPE_WRITE) 548 writeTest(fs); 549 if (testType == TEST_TYPE_READ) 550 readTest(fs); 551 long execTime = System.currentTimeMillis() - tStart; 552 553 analyzeResult(fs, testType, execTime, resFileName); 554 } catch(Exception e) { 555 System.err.print(e.getLocalizedMessage()); 556 return -1; 557 } 558 return 0; 559 } 560 } 561