1  /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs;
20 
21 import java.io.BufferedReader;
22 import java.io.DataInputStream;
23 import java.io.File;
24 import java.io.FileOutputStream;
25 import java.io.IOException;
26 import java.io.InputStreamReader;
27 import java.io.PrintStream;
28 import java.util.Date;
29 import java.util.StringTokenizer;
30 
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.conf.Configured;
35 import org.apache.hadoop.io.LongWritable;
36 import org.apache.hadoop.io.SequenceFile;
37 import org.apache.hadoop.io.Text;
38 import org.apache.hadoop.io.SequenceFile.CompressionType;
39 import org.apache.hadoop.mapred.*;
40 import org.apache.hadoop.util.Tool;
41 import org.apache.hadoop.util.ToolRunner;
42 import org.junit.Test;
43 
44 /**
45  * Distributed i/o benchmark.
46  * <p>
47  * This test writes into or reads from a specified number of files.
48  * File size is specified as a parameter to the test.
49  * Each file is accessed in a separate map task.
50  * <p>
51  * The reducer collects the following statistics:
52  * <ul>
53  * <li>number of tasks completed</li>
54  * <li>number of bytes written/read</li>
55  * <li>execution time</li>
56  * <li>io rate</li>
57  * <li>io rate squared</li>
58  * </ul>
59  *
60  * Finally, the following information is appended to a local file
61  * <ul>
62  * <li>read or write test</li>
63  * <li>date and time the test finished</li>
64  * <li>number of files</li>
65  * <li>total number of bytes processed</li>
66  * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
67  * <li>average i/o rate in mb/sec per file</li>
68  * <li>standard i/o rate deviation</li>
69  * </ul>
70  */
71 public class DFSCIOTest extends Configured implements Tool {
72   // Constants
73   private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);
74   private static final int TEST_TYPE_READ = 0;
75   private static final int TEST_TYPE_WRITE = 1;
76   private static final int TEST_TYPE_CLEANUP = 2;
77   private static final int DEFAULT_BUFFER_SIZE = 1000000;
78   private static final String BASE_FILE_NAME = "test_io_";
79   private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
80 
81   private static Configuration fsConfig = new Configuration();
82   private static final long MEGA = 0x100000;
83   private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
84   private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
85   private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
86   private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
87   private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
88 
89   private static Path HDFS_TEST_DIR = new Path("/tmp/DFSCIOTest");
90   private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1");
91   private static String CHMOD = new String("chmod");
92   private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION);
93   private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read");
94   private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write");
95 
96   /**
97    * Run the test with default parameters.
98    *
99    * @throws Exception
100    */
101   @Test
testIOs()102   public void testIOs() throws Exception {
103     testIOs(10, 10);
104   }
105 
106   /**
107    * Run the test with the specified parameters.
108    *
109    * @param fileSize file size
110    * @param nrFiles number of files
111    * @throws IOException
112    */
testIOs(int fileSize, int nrFiles)113   public static void testIOs(int fileSize, int nrFiles)
114     throws IOException {
115 
116     FileSystem fs = FileSystem.get(fsConfig);
117 
118     createControlFile(fs, fileSize, nrFiles);
119     writeTest(fs);
120     readTest(fs);
121   }
122 
createControlFile( FileSystem fs, int fileSize, int nrFiles )123   private static void createControlFile(
124                                         FileSystem fs,
125                                         int fileSize, // in MB
126                                         int nrFiles
127                                         ) throws IOException {
128     LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
129 
130     fs.delete(CONTROL_DIR, true);
131 
132     for(int i=0; i < nrFiles; i++) {
133       String name = getFileName(i);
134       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
135       SequenceFile.Writer writer = null;
136       try {
137         writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
138                                            Text.class, LongWritable.class,
139                                            CompressionType.NONE);
140         writer.append(new Text(name), new LongWritable(fileSize));
141       } catch(Exception e) {
142         throw new IOException(e.getLocalizedMessage());
143       } finally {
144     	if (writer != null)
145           writer.close();
146     	writer = null;
147       }
148     }
149     LOG.info("created control files for: "+nrFiles+" files");
150   }
151 
getFileName(int fIdx)152   private static String getFileName(int fIdx) {
153     return BASE_FILE_NAME + Integer.toString(fIdx);
154   }
155 
156   /**
157    * Write/Read mapper base class.
158    * <p>
159    * Collects the following statistics per task:
160    * <ul>
161    * <li>number of tasks completed</li>
162    * <li>number of bytes written/read</li>
163    * <li>execution time</li>
164    * <li>i/o rate</li>
165    * <li>i/o rate squared</li>
166    * </ul>
167    */
168   private abstract static class IOStatMapper extends IOMapperBase<Long> {
IOStatMapper()169     IOStatMapper() {
170     }
171 
collectStats(OutputCollector<Text, Text> output, String name, long execTime, Long objSize)172     void collectStats(OutputCollector<Text, Text> output,
173                       String name,
174                       long execTime,
175                       Long objSize) throws IOException {
176       long totalSize = objSize.longValue();
177       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
178       LOG.info("Number of bytes processed = " + totalSize);
179       LOG.info("Exec time = " + execTime);
180       LOG.info("IO rate = " + ioRateMbSec);
181 
182       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
183           new Text(String.valueOf(1)));
184       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
185           new Text(String.valueOf(totalSize)));
186       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
187           new Text(String.valueOf(execTime)));
188       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
189           new Text(String.valueOf(ioRateMbSec*1000)));
190       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
191           new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
192     }
193   }
194 
195   /**
196    * Write mapper class.
197    */
198   public static class WriteMapper extends IOStatMapper {
199 
WriteMapper()200     public WriteMapper() {
201       super();
202       for(int i=0; i < bufferSize; i++)
203         buffer[i] = (byte)('0' + i % 50);
204     }
205 
doIO(Reporter reporter, String name, long totalSize )206     public Long doIO(Reporter reporter,
207                        String name,
208                        long totalSize
209                        ) throws IOException {
210       // create file
211       totalSize *= MEGA;
212 
213       // create instance of local filesystem
214       FileSystem localFS = FileSystem.getLocal(fsConfig);
215 
216       try {
217         // native runtime
218         Runtime runTime = Runtime.getRuntime();
219 
220         // copy the dso and executable from dfs and chmod them
221         synchronized (this) {
222           localFS.delete(HDFS_TEST_DIR, true);
223           if (!(localFS.mkdirs(HDFS_TEST_DIR))) {
224             throw new IOException("Failed to create " +	HDFS_TEST_DIR + " on local filesystem");
225           }
226         }
227 
228         synchronized (this) {
229           if (!localFS.exists(HDFS_SHLIB)) {
230             FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig);
231 
232             String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB);
233             Process process = runTime.exec(chmodCmd);
234             int exitStatus = process.waitFor();
235             if (exitStatus != 0) {
236               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
237             }
238           }
239         }
240 
241         synchronized (this) {
242           if (!localFS.exists(HDFS_WRITE)) {
243             FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig);
244 
245             String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE);
246             Process process = runTime.exec(chmodCmd);
247             int exitStatus = process.waitFor();
248             if (exitStatus != 0) {
249               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
250             }
251           }
252         }
253 
254         // exec the C program
255         Path outFile = new Path(DATA_DIR, name);
256         String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize);
257         Process process = runTime.exec(writeCmd, null, new File(HDFS_TEST_DIR.toString()));
258         int exitStatus = process.waitFor();
259         if (exitStatus != 0) {
260           throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus);
261         }
262       } catch (InterruptedException interruptedException) {
263         reporter.setStatus(interruptedException.toString());
264       } finally {
265         localFS.close();
266       }
267       return new Long(totalSize);
268     }
269   }
270 
writeTest(FileSystem fs)271   private static void writeTest(FileSystem fs)
272     throws IOException {
273 
274     fs.delete(DATA_DIR, true);
275     fs.delete(WRITE_DIR, true);
276 
277     runIOTest(WriteMapper.class, WRITE_DIR);
278   }
279 
runIOTest( Class<? extends Mapper> mapperClass, Path outputDir )280   private static void runIOTest( Class<? extends Mapper> mapperClass,
281                                  Path outputDir
282                                  ) throws IOException {
283     JobConf job = new JobConf(fsConfig, DFSCIOTest.class);
284 
285     FileInputFormat.setInputPaths(job, CONTROL_DIR);
286     job.setInputFormat(SequenceFileInputFormat.class);
287 
288     job.setMapperClass(mapperClass);
289     job.setReducerClass(AccumulatingReducer.class);
290 
291     FileOutputFormat.setOutputPath(job, outputDir);
292     job.setOutputKeyClass(Text.class);
293     job.setOutputValueClass(Text.class);
294     job.setNumReduceTasks(1);
295     JobClient.runJob(job);
296   }
297 
298   /**
299    * Read mapper class.
300    */
301   public static class ReadMapper extends IOStatMapper {
302 
ReadMapper()303     public ReadMapper() {
304       super();
305     }
306 
doIO(Reporter reporter, String name, long totalSize )307     public Long doIO(Reporter reporter,
308                        String name,
309                        long totalSize
310                        ) throws IOException {
311       totalSize *= MEGA;
312 
313       // create instance of local filesystem
314       FileSystem localFS = FileSystem.getLocal(fsConfig);
315 
316       try {
317         // native runtime
318         Runtime runTime = Runtime.getRuntime();
319 
320         // copy the dso and executable from dfs
321         synchronized (this) {
322           localFS.delete(HDFS_TEST_DIR, true);
323           if (!(localFS.mkdirs(HDFS_TEST_DIR))) {
324             throw new IOException("Failed to create " +	HDFS_TEST_DIR + " on local filesystem");
325           }
326         }
327 
328         synchronized (this) {
329           if (!localFS.exists(HDFS_SHLIB)) {
330             if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) {
331               throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem");
332             }
333 
334             String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB);
335             Process process = runTime.exec(chmodCmd);
336             int exitStatus = process.waitFor();
337             if (exitStatus != 0) {
338               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
339             }
340           }
341         }
342 
343         synchronized (this) {
344           if (!localFS.exists(HDFS_READ)) {
345             if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) {
346               throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem");
347             }
348 
349             String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ);
350             Process process = runTime.exec(chmodCmd);
351             int exitStatus = process.waitFor();
352 
353             if (exitStatus != 0) {
354               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
355             }
356           }
357         }
358 
359         // exec the C program
360         Path inFile = new Path(DATA_DIR, name);
361         String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " +
362                                     bufferSize);
363         Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString()));
364         int exitStatus = process.waitFor();
365 
366         if (exitStatus != 0) {
367           throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus);
368         }
369       } catch (InterruptedException interruptedException) {
370         reporter.setStatus(interruptedException.toString());
371       } finally {
372         localFS.close();
373       }
374       return new Long(totalSize);
375     }
376   }
377 
readTest(FileSystem fs)378   private static void readTest(FileSystem fs) throws IOException {
379     fs.delete(READ_DIR, true);
380     runIOTest(ReadMapper.class, READ_DIR);
381   }
382 
sequentialTest( FileSystem fs, int testType, int fileSize, int nrFiles )383   private static void sequentialTest(
384                                      FileSystem fs,
385                                      int testType,
386                                      int fileSize,
387                                      int nrFiles
388                                      ) throws Exception {
389     IOStatMapper ioer = null;
390     if (testType == TEST_TYPE_READ)
391       ioer = new ReadMapper();
392     else if (testType == TEST_TYPE_WRITE)
393       ioer = new WriteMapper();
394     else
395       return;
396     for(int i=0; i < nrFiles; i++)
397       ioer.doIO(Reporter.NULL,
398                 BASE_FILE_NAME+Integer.toString(i),
399                 MEGA*fileSize);
400   }
401 
main(String[] args)402   public static void main(String[] args) throws Exception {
403     int res = ToolRunner.run(new TestDFSIO(), args);
404     System.exit(res);
405   }
406 
analyzeResult( FileSystem fs, int testType, long execTime, String resFileName )407   private static void analyzeResult( FileSystem fs,
408                                      int testType,
409                                      long execTime,
410                                      String resFileName
411                                      ) throws IOException {
412     Path reduceFile;
413     if (testType == TEST_TYPE_WRITE)
414       reduceFile = new Path(WRITE_DIR, "part-00000");
415     else
416       reduceFile = new Path(READ_DIR, "part-00000");
417     DataInputStream in;
418     in = new DataInputStream(fs.open(reduceFile));
419 
420     BufferedReader lines;
421     lines = new BufferedReader(new InputStreamReader(in));
422     long tasks = 0;
423     long size = 0;
424     long time = 0;
425     float rate = 0;
426     float sqrate = 0;
427     String line;
428     while((line = lines.readLine()) != null) {
429       StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
430       String attr = tokens.nextToken();
431       if (attr.endsWith(":tasks"))
432         tasks = Long.parseLong(tokens.nextToken());
433       else if (attr.endsWith(":size"))
434         size = Long.parseLong(tokens.	nextToken());
435       else if (attr.endsWith(":time"))
436         time = Long.parseLong(tokens.nextToken());
437       else if (attr.endsWith(":rate"))
438         rate = Float.parseFloat(tokens.nextToken());
439       else if (attr.endsWith(":sqrate"))
440         sqrate = Float.parseFloat(tokens.nextToken());
441     }
442 
443     double med = rate / 1000 / tasks;
444     double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
445     String resultLines[] = {
446       "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
447                                      (testType == TEST_TYPE_READ) ? "read" :
448                                      "unknown"),
449       "           Date & time: " + new Date(System.currentTimeMillis()),
450       "       Number of files: " + tasks,
451       "Total MBytes processed: " + size/MEGA,
452       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
453       "Average IO rate mb/sec: " + med,
454       " Std IO rate deviation: " + stdDev,
455       "    Test exec time sec: " + (float)execTime / 1000,
456       "" };
457 
458     PrintStream res = new PrintStream(
459                                       new FileOutputStream(
460                                                            new File(resFileName), true));
461     for(int i = 0; i < resultLines.length; i++) {
462       LOG.info(resultLines[i]);
463       res.println(resultLines[i]);
464     }
465   }
466 
cleanup(FileSystem fs)467   private static void cleanup(FileSystem fs) throws Exception {
468     LOG.info("Cleaning up test files");
469     fs.delete(new Path(TEST_ROOT_DIR), true);
470     fs.delete(HDFS_TEST_DIR, true);
471   }
472 
473   @Override
run(String[] args)474   public int run(String[] args) throws Exception {
475     int testType = TEST_TYPE_READ;
476     int bufferSize = DEFAULT_BUFFER_SIZE;
477     int fileSize = 1;
478     int nrFiles = 1;
479     String resFileName = DEFAULT_RES_FILE_NAME;
480     boolean isSequential = false;
481 
482     String version="DFSCIOTest.0.0.1";
483     String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
484 
485     System.out.println(version);
486     if (args.length == 0) {
487       System.err.println(usage);
488       System.exit(-1);
489     }
490     for (int i = 0; i < args.length; i++) {       // parse command line
491       if (args[i].startsWith("-r")) {
492         testType = TEST_TYPE_READ;
493       } else if (args[i].startsWith("-w")) {
494         testType = TEST_TYPE_WRITE;
495       } else if (args[i].startsWith("-clean")) {
496         testType = TEST_TYPE_CLEANUP;
497       } else if (args[i].startsWith("-seq")) {
498         isSequential = true;
499       } else if (args[i].equals("-nrFiles")) {
500         nrFiles = Integer.parseInt(args[++i]);
501       } else if (args[i].equals("-fileSize")) {
502         fileSize = Integer.parseInt(args[++i]);
503       } else if (args[i].equals("-bufferSize")) {
504         bufferSize = Integer.parseInt(args[++i]);
505       } else if (args[i].equals("-resFile")) {
506         resFileName = args[++i];
507       }
508     }
509 
510     LOG.info("nrFiles = " + nrFiles);
511     LOG.info("fileSize (MB) = " + fileSize);
512     LOG.info("bufferSize = " + bufferSize);
513 
514     try {
515       fsConfig.setInt("test.io.file.buffer.size", bufferSize);
516       FileSystem fs = FileSystem.get(fsConfig);
517 
518       if (testType != TEST_TYPE_CLEANUP) {
519         fs.delete(HDFS_TEST_DIR, true);
520         if (!fs.mkdirs(HDFS_TEST_DIR)) {
521           throw new IOException("Mkdirs failed to create " +
522                                 HDFS_TEST_DIR.toString());
523         }
524 
525         //Copy the executables over to the remote filesystem
526         String hadoopHome = System.getenv("HADOOP_HOME");
527         fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION),
528                              HDFS_SHLIB);
529         fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ);
530         fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE);
531       }
532 
533       if (isSequential) {
534         long tStart = System.currentTimeMillis();
535         sequentialTest(fs, testType, fileSize, nrFiles);
536         long execTime = System.currentTimeMillis() - tStart;
537         String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
538         LOG.info(resultLine);
539         return 0;
540       }
541       if (testType == TEST_TYPE_CLEANUP) {
542         cleanup(fs);
543         return 0;
544       }
545       createControlFile(fs, fileSize, nrFiles);
546       long tStart = System.currentTimeMillis();
547       if (testType == TEST_TYPE_WRITE)
548         writeTest(fs);
549       if (testType == TEST_TYPE_READ)
550         readTest(fs);
551       long execTime = System.currentTimeMillis() - tStart;
552 
553       analyzeResult(fs, testType, execTime, resFileName);
554     } catch(Exception e) {
555       System.err.print(e.getLocalizedMessage());
556       return -1;
557     }
558     return 0;
559   }
560 }
561