1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs;
20 
21 import java.io.BufferedReader;
22 import java.io.Closeable;
23 import java.io.DataInputStream;
24 import java.io.File;
25 import java.io.FileOutputStream;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InputStreamReader;
29 import java.io.OutputStream;
30 import java.io.PrintStream;
31 import java.util.Date;
32 import java.util.Random;
33 import java.util.StringTokenizer;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hdfs.DFSConfigKeys;
38 import org.apache.hadoop.hdfs.MiniDFSCluster;
39 import org.apache.hadoop.io.LongWritable;
40 import org.apache.hadoop.io.SequenceFile;
41 import org.apache.hadoop.io.SequenceFile.CompressionType;
42 import org.apache.hadoop.io.Text;
43 import org.apache.hadoop.io.compress.CompressionCodec;
44 import org.apache.hadoop.mapred.FileInputFormat;
45 import org.apache.hadoop.mapred.FileOutputFormat;
46 import org.apache.hadoop.mapred.JobClient;
47 import org.apache.hadoop.mapred.JobConf;
48 import org.apache.hadoop.mapred.Mapper;
49 import org.apache.hadoop.mapred.OutputCollector;
50 import org.apache.hadoop.mapred.Reporter;
51 import org.apache.hadoop.mapred.SequenceFileInputFormat;
52 import org.apache.hadoop.util.ReflectionUtils;
53 import org.apache.hadoop.util.StringUtils;
54 import org.apache.hadoop.util.Tool;
55 import org.apache.hadoop.util.ToolRunner;
56 import org.junit.AfterClass;
57 import org.junit.BeforeClass;
58 import org.junit.Test;
59 
60 /**
61  * Distributed i/o benchmark.
62  * <p>
63  * This test writes into or reads from a specified number of files.
64  * Number of bytes to write or read is specified as a parameter to the test.
65  * Each file is accessed in a separate map task.
66  * <p>
67  * The reducer collects the following statistics:
68  * <ul>
69  * <li>number of tasks completed</li>
70  * <li>number of bytes written/read</li>
71  * <li>execution time</li>
72  * <li>io rate</li>
73  * <li>io rate squared</li>
74  * </ul>
75  *
76  * Finally, the following information is appended to a local file
77  * <ul>
78  * <li>read or write test</li>
79  * <li>date and time the test finished</li>
80  * <li>number of files</li>
81  * <li>total number of bytes processed</li>
82  * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
83  * <li>average i/o rate in mb/sec per file</li>
84  * <li>standard deviation of i/o rate </li>
85  * </ul>
86  */
87 public class TestDFSIO implements Tool {
88   // Constants
89   private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
90   private static final int DEFAULT_BUFFER_SIZE = 1000000;
91   private static final String BASE_FILE_NAME = "test_io_";
92   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
93   private static final long MEGA = ByteMultiple.MB.value();
94   private static final int DEFAULT_NR_BYTES = 128;
95   private static final int DEFAULT_NR_FILES = 4;
96   private static final String USAGE =
97                     "Usage: " + TestDFSIO.class.getSimpleName() +
98                     " [genericOptions]" +
99                     " -read [-random | -backward | -skip [-skipSize Size]] |" +
100                     " -write | -append | -truncate | -clean" +
101                     " [-compression codecClassName]" +
102                     " [-nrFiles N]" +
103                     " [-size Size[B|KB|MB|GB|TB]]" +
104                     " [-resFile resultFileName] [-bufferSize Bytes]" +
105                     " [-rootDir]";
106 
107   private Configuration config;
108 
109   static{
110     Configuration.addDefaultResource("hdfs-default.xml");
111     Configuration.addDefaultResource("hdfs-site.xml");
112     Configuration.addDefaultResource("mapred-default.xml");
113     Configuration.addDefaultResource("mapred-site.xml");
114   }
115 
116   private static enum TestType {
117     TEST_TYPE_READ("read"),
118     TEST_TYPE_WRITE("write"),
119     TEST_TYPE_CLEANUP("cleanup"),
120     TEST_TYPE_APPEND("append"),
121     TEST_TYPE_READ_RANDOM("random read"),
122     TEST_TYPE_READ_BACKWARD("backward read"),
123     TEST_TYPE_READ_SKIP("skip read"),
124     TEST_TYPE_TRUNCATE("truncate");
125 
126     private String type;
127 
TestType(String t)128     private TestType(String t) {
129       type = t;
130     }
131 
132     @Override // String
toString()133     public String toString() {
134       return type;
135     }
136   }
137 
138   static enum ByteMultiple {
139     B(1L),
140     KB(0x400L),
141     MB(0x100000L),
142     GB(0x40000000L),
143     TB(0x10000000000L);
144 
145     private long multiplier;
146 
ByteMultiple(long mult)147     private ByteMultiple(long mult) {
148       multiplier = mult;
149     }
150 
value()151     long value() {
152       return multiplier;
153     }
154 
parseString(String sMultiple)155     static ByteMultiple parseString(String sMultiple) {
156       if(sMultiple == null || sMultiple.isEmpty()) // MB by default
157         return MB;
158       String sMU = StringUtils.toUpperCase(sMultiple);
159       if(StringUtils.toUpperCase(B.name()).endsWith(sMU))
160         return B;
161       if(StringUtils.toUpperCase(KB.name()).endsWith(sMU))
162         return KB;
163       if(StringUtils.toUpperCase(MB.name()).endsWith(sMU))
164         return MB;
165       if(StringUtils.toUpperCase(GB.name()).endsWith(sMU))
166         return GB;
167       if(StringUtils.toUpperCase(TB.name()).endsWith(sMU))
168         return TB;
169       throw new IllegalArgumentException("Unsupported ByteMultiple "+sMultiple);
170     }
171   }
172 
TestDFSIO()173   public TestDFSIO() {
174     this.config = new Configuration();
175   }
176 
getBaseDir(Configuration conf)177   private static String getBaseDir(Configuration conf) {
178     return conf.get("test.build.data","/benchmarks/TestDFSIO");
179   }
getControlDir(Configuration conf)180   private static Path getControlDir(Configuration conf) {
181     return new Path(getBaseDir(conf), "io_control");
182   }
getWriteDir(Configuration conf)183   private static Path getWriteDir(Configuration conf) {
184     return new Path(getBaseDir(conf), "io_write");
185   }
getReadDir(Configuration conf)186   private static Path getReadDir(Configuration conf) {
187     return new Path(getBaseDir(conf), "io_read");
188   }
getAppendDir(Configuration conf)189   private static Path getAppendDir(Configuration conf) {
190     return new Path(getBaseDir(conf), "io_append");
191   }
getRandomReadDir(Configuration conf)192   private static Path getRandomReadDir(Configuration conf) {
193     return new Path(getBaseDir(conf), "io_random_read");
194   }
getTruncateDir(Configuration conf)195   private static Path getTruncateDir(Configuration conf) {
196     return new Path(getBaseDir(conf), "io_truncate");
197   }
getDataDir(Configuration conf)198   private static Path getDataDir(Configuration conf) {
199     return new Path(getBaseDir(conf), "io_data");
200   }
201 
202   private static MiniDFSCluster cluster;
203   private static TestDFSIO bench;
204 
205   @BeforeClass
beforeClass()206   public static void beforeClass() throws Exception {
207     bench = new TestDFSIO();
208     bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
209     bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
210     cluster = new MiniDFSCluster.Builder(bench.getConf())
211                                 .numDataNodes(2)
212                                 .format(true)
213                                 .build();
214     FileSystem fs = cluster.getFileSystem();
215     bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
216 
217     /** Check write here, as it is required for other tests */
218     testWrite();
219   }
220 
221   @AfterClass
afterClass()222   public static void afterClass() throws Exception {
223     if(cluster == null)
224       return;
225     FileSystem fs = cluster.getFileSystem();
226     bench.cleanup(fs);
227     cluster.shutdown();
228   }
229 
testWrite()230   public static void testWrite() throws Exception {
231     FileSystem fs = cluster.getFileSystem();
232     long tStart = System.currentTimeMillis();
233     bench.writeTest(fs);
234     long execTime = System.currentTimeMillis() - tStart;
235     bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
236   }
237 
238   @Test (timeout = 3000)
testRead()239   public void testRead() throws Exception {
240     FileSystem fs = cluster.getFileSystem();
241     long tStart = System.currentTimeMillis();
242     bench.readTest(fs);
243     long execTime = System.currentTimeMillis() - tStart;
244     bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
245   }
246 
247   @Test (timeout = 3000)
testReadRandom()248   public void testReadRandom() throws Exception {
249     FileSystem fs = cluster.getFileSystem();
250     long tStart = System.currentTimeMillis();
251     bench.getConf().setLong("test.io.skip.size", 0);
252     bench.randomReadTest(fs);
253     long execTime = System.currentTimeMillis() - tStart;
254     bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
255   }
256 
257   @Test (timeout = 3000)
testReadBackward()258   public void testReadBackward() throws Exception {
259     FileSystem fs = cluster.getFileSystem();
260     long tStart = System.currentTimeMillis();
261     bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE);
262     bench.randomReadTest(fs);
263     long execTime = System.currentTimeMillis() - tStart;
264     bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
265   }
266 
267   @Test (timeout = 3000)
testReadSkip()268   public void testReadSkip() throws Exception {
269     FileSystem fs = cluster.getFileSystem();
270     long tStart = System.currentTimeMillis();
271     bench.getConf().setLong("test.io.skip.size", 1);
272     bench.randomReadTest(fs);
273     long execTime = System.currentTimeMillis() - tStart;
274     bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
275   }
276 
277   @Test (timeout = 6000)
testAppend()278   public void testAppend() throws Exception {
279     FileSystem fs = cluster.getFileSystem();
280     long tStart = System.currentTimeMillis();
281     bench.appendTest(fs);
282     long execTime = System.currentTimeMillis() - tStart;
283     bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
284   }
285 
286   @Test (timeout = 60000)
testTruncate()287   public void testTruncate() throws Exception {
288     FileSystem fs = cluster.getFileSystem();
289     bench.createControlFile(fs, DEFAULT_NR_BYTES / 2, DEFAULT_NR_FILES);
290     long tStart = System.currentTimeMillis();
291     bench.truncateTest(fs);
292     long execTime = System.currentTimeMillis() - tStart;
293     bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime);
294   }
295 
296   @SuppressWarnings("deprecation")
createControlFile(FileSystem fs, long nrBytes, int nrFiles )297   private void createControlFile(FileSystem fs,
298                                   long nrBytes, // in bytes
299                                   int nrFiles
300                                 ) throws IOException {
301     LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
302 
303     Path controlDir = getControlDir(config);
304     fs.delete(controlDir, true);
305 
306     for(int i=0; i < nrFiles; i++) {
307       String name = getFileName(i);
308       Path controlFile = new Path(controlDir, "in_file_" + name);
309       SequenceFile.Writer writer = null;
310       try {
311         writer = SequenceFile.createWriter(fs, config, controlFile,
312                                            Text.class, LongWritable.class,
313                                            CompressionType.NONE);
314         writer.append(new Text(name), new LongWritable(nrBytes));
315       } catch(Exception e) {
316         throw new IOException(e.getLocalizedMessage());
317       } finally {
318         if (writer != null)
319           writer.close();
320         writer = null;
321       }
322     }
323     LOG.info("created control files for: "+nrFiles+" files");
324   }
325 
getFileName(int fIdx)326   private static String getFileName(int fIdx) {
327     return BASE_FILE_NAME + Integer.toString(fIdx);
328   }
329 
330   /**
331    * Write/Read mapper base class.
332    * <p>
333    * Collects the following statistics per task:
334    * <ul>
335    * <li>number of tasks completed</li>
336    * <li>number of bytes written/read</li>
337    * <li>execution time</li>
338    * <li>i/o rate</li>
339    * <li>i/o rate squared</li>
340    * </ul>
341    */
342   private abstract static class IOStatMapper extends IOMapperBase<Long> {
343     protected CompressionCodec compressionCodec;
344 
IOStatMapper()345     IOStatMapper() {
346     }
347 
348     @Override // Mapper
configure(JobConf conf)349     public void configure(JobConf conf) {
350       super.configure(conf);
351 
352       // grab compression
353       String compression = getConf().get("test.io.compression.class", null);
354       Class<? extends CompressionCodec> codec;
355 
356       // try to initialize codec
357       try {
358         codec = (compression == null) ? null :
359           Class.forName(compression).asSubclass(CompressionCodec.class);
360       } catch(Exception e) {
361         throw new RuntimeException("Compression codec not found: ", e);
362       }
363 
364       if(codec != null) {
365         compressionCodec = (CompressionCodec)
366             ReflectionUtils.newInstance(codec, getConf());
367       }
368     }
369 
370     @Override // IOMapperBase
collectStats(OutputCollector<Text, Text> output, String name, long execTime, Long objSize)371     void collectStats(OutputCollector<Text, Text> output,
372                       String name,
373                       long execTime,
374                       Long objSize) throws IOException {
375       long totalSize = objSize.longValue();
376       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
377       LOG.info("Number of bytes processed = " + totalSize);
378       LOG.info("Exec time = " + execTime);
379       LOG.info("IO rate = " + ioRateMbSec);
380 
381       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
382           new Text(String.valueOf(1)));
383       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
384           new Text(String.valueOf(totalSize)));
385       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
386           new Text(String.valueOf(execTime)));
387       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
388           new Text(String.valueOf(ioRateMbSec*1000)));
389       output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
390           new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
391     }
392   }
393 
394   /**
395    * Write mapper class.
396    */
397   public static class WriteMapper extends IOStatMapper {
398 
WriteMapper()399     public WriteMapper() {
400       for(int i=0; i < bufferSize; i++)
401         buffer[i] = (byte)('0' + i % 50);
402     }
403 
404     @Override // IOMapperBase
getIOStream(String name)405     public Closeable getIOStream(String name) throws IOException {
406       // create file
407       OutputStream out =
408           fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
409       if(compressionCodec != null)
410         out = compressionCodec.createOutputStream(out);
411       LOG.info("out = " + out.getClass().getName());
412       return out;
413     }
414 
415     @Override // IOMapperBase
doIO(Reporter reporter, String name, long totalSize )416     public Long doIO(Reporter reporter,
417                        String name,
418                        long totalSize // in bytes
419                      ) throws IOException {
420       OutputStream out = (OutputStream)this.stream;
421       // write to the file
422       long nrRemaining;
423       for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
424         int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
425         out.write(buffer, 0, curSize);
426         reporter.setStatus("writing " + name + "@" +
427                            (totalSize - nrRemaining) + "/" + totalSize
428                            + " ::host = " + hostName);
429       }
430       return Long.valueOf(totalSize);
431     }
432   }
433 
writeTest(FileSystem fs)434   private void writeTest(FileSystem fs) throws IOException {
435     Path writeDir = getWriteDir(config);
436     fs.delete(getDataDir(config), true);
437     fs.delete(writeDir, true);
438 
439     runIOTest(WriteMapper.class, writeDir);
440   }
441 
runIOTest( Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Path outputDir)442   private void runIOTest(
443           Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
444           Path outputDir) throws IOException {
445     JobConf job = new JobConf(config, TestDFSIO.class);
446 
447     FileInputFormat.setInputPaths(job, getControlDir(config));
448     job.setInputFormat(SequenceFileInputFormat.class);
449 
450     job.setMapperClass(mapperClass);
451     job.setReducerClass(AccumulatingReducer.class);
452 
453     FileOutputFormat.setOutputPath(job, outputDir);
454     job.setOutputKeyClass(Text.class);
455     job.setOutputValueClass(Text.class);
456     job.setNumReduceTasks(1);
457     JobClient.runJob(job);
458   }
459 
460   /**
461    * Append mapper class.
462    */
463   public static class AppendMapper extends IOStatMapper {
464 
AppendMapper()465     public AppendMapper() {
466       for(int i=0; i < bufferSize; i++)
467         buffer[i] = (byte)('0' + i % 50);
468     }
469 
470     @Override // IOMapperBase
getIOStream(String name)471     public Closeable getIOStream(String name) throws IOException {
472       // open file for append
473       OutputStream out =
474           fs.append(new Path(getDataDir(getConf()), name), bufferSize);
475       if(compressionCodec != null)
476         out = compressionCodec.createOutputStream(out);
477       LOG.info("out = " + out.getClass().getName());
478       return out;
479     }
480 
481     @Override // IOMapperBase
doIO(Reporter reporter, String name, long totalSize )482     public Long doIO(Reporter reporter,
483                        String name,
484                        long totalSize // in bytes
485                      ) throws IOException {
486       OutputStream out = (OutputStream)this.stream;
487       // write to the file
488       long nrRemaining;
489       for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
490         int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
491         out.write(buffer, 0, curSize);
492         reporter.setStatus("writing " + name + "@" +
493                            (totalSize - nrRemaining) + "/" + totalSize
494                            + " ::host = " + hostName);
495       }
496       return Long.valueOf(totalSize);
497     }
498   }
499 
appendTest(FileSystem fs)500   private void appendTest(FileSystem fs) throws IOException {
501     Path appendDir = getAppendDir(config);
502     fs.delete(appendDir, true);
503     runIOTest(AppendMapper.class, appendDir);
504   }
505 
506   /**
507    * Read mapper class.
508    */
509   public static class ReadMapper extends IOStatMapper {
510 
ReadMapper()511     public ReadMapper() {
512     }
513 
514     @Override // IOMapperBase
getIOStream(String name)515     public Closeable getIOStream(String name) throws IOException {
516       // open file
517       InputStream in = fs.open(new Path(getDataDir(getConf()), name));
518       if(compressionCodec != null)
519         in = compressionCodec.createInputStream(in);
520       LOG.info("in = " + in.getClass().getName());
521       return in;
522     }
523 
524     @Override // IOMapperBase
doIO(Reporter reporter, String name, long totalSize )525     public Long doIO(Reporter reporter,
526                        String name,
527                        long totalSize // in bytes
528                      ) throws IOException {
529       InputStream in = (InputStream)this.stream;
530       long actualSize = 0;
531       while (actualSize < totalSize) {
532         int curSize = in.read(buffer, 0, bufferSize);
533         if(curSize < 0) break;
534         actualSize += curSize;
535         reporter.setStatus("reading " + name + "@" +
536                            actualSize + "/" + totalSize
537                            + " ::host = " + hostName);
538       }
539       return Long.valueOf(actualSize);
540     }
541   }
542 
readTest(FileSystem fs)543   private void readTest(FileSystem fs) throws IOException {
544     Path readDir = getReadDir(config);
545     fs.delete(readDir, true);
546     runIOTest(ReadMapper.class, readDir);
547   }
548 
549   /**
550    * Mapper class for random reads.
551    * The mapper chooses a position in the file and reads bufferSize
552    * bytes starting at the chosen position.
553    * It stops after reading the totalSize bytes, specified by -size.
554    *
555    * There are three type of reads.
556    * 1) Random read always chooses a random position to read from: skipSize = 0
557    * 2) Backward read reads file in reverse order                : skipSize < 0
558    * 3) Skip-read skips skipSize bytes after every read          : skipSize > 0
559    */
560   public static class RandomReadMapper extends IOStatMapper {
561     private Random rnd;
562     private long fileSize;
563     private long skipSize;
564 
565     @Override // Mapper
configure(JobConf conf)566     public void configure(JobConf conf) {
567       super.configure(conf);
568       skipSize = conf.getLong("test.io.skip.size", 0);
569     }
570 
RandomReadMapper()571     public RandomReadMapper() {
572       rnd = new Random();
573     }
574 
575     @Override // IOMapperBase
getIOStream(String name)576     public Closeable getIOStream(String name) throws IOException {
577       Path filePath = new Path(getDataDir(getConf()), name);
578       this.fileSize = fs.getFileStatus(filePath).getLen();
579       InputStream in = fs.open(filePath);
580       if(compressionCodec != null)
581         in = new FSDataInputStream(compressionCodec.createInputStream(in));
582       LOG.info("in = " + in.getClass().getName());
583       LOG.info("skipSize = " + skipSize);
584       return in;
585     }
586 
587     @Override // IOMapperBase
doIO(Reporter reporter, String name, long totalSize )588     public Long doIO(Reporter reporter,
589                        String name,
590                        long totalSize // in bytes
591                      ) throws IOException {
592       PositionedReadable in = (PositionedReadable)this.stream;
593       long actualSize = 0;
594       for(long pos = nextOffset(-1);
595           actualSize < totalSize; pos = nextOffset(pos)) {
596         int curSize = in.read(pos, buffer, 0, bufferSize);
597         if(curSize < 0) break;
598         actualSize += curSize;
599         reporter.setStatus("reading " + name + "@" +
600                            actualSize + "/" + totalSize
601                            + " ::host = " + hostName);
602       }
603       return Long.valueOf(actualSize);
604     }
605 
606     /**
607      * Get next offset for reading.
608      * If current < 0 then choose initial offset according to the read type.
609      *
610      * @param current offset
611      * @return
612      */
nextOffset(long current)613     private long nextOffset(long current) {
614       if(skipSize == 0)
615         return rnd.nextInt((int)(fileSize));
616       if(skipSize > 0)
617         return (current < 0) ? 0 : (current + bufferSize + skipSize);
618       // skipSize < 0
619       return (current < 0) ? Math.max(0, fileSize - bufferSize) :
620                              Math.max(0, current + skipSize);
621     }
622   }
623 
randomReadTest(FileSystem fs)624   private void randomReadTest(FileSystem fs) throws IOException {
625     Path readDir = getRandomReadDir(config);
626     fs.delete(readDir, true);
627     runIOTest(RandomReadMapper.class, readDir);
628   }
629 
630   /**
631    * Truncate mapper class.
632    * The mapper truncates given file to the newLength, specified by -size.
633    */
634   public static class TruncateMapper extends IOStatMapper {
635     private static final long DELAY = 100L;
636 
637     private Path filePath;
638     private long fileSize;
639 
640     @Override // IOMapperBase
getIOStream(String name)641     public Closeable getIOStream(String name) throws IOException {
642       filePath = new Path(getDataDir(getConf()), name);
643       fileSize = fs.getFileStatus(filePath).getLen();
644       return null;
645     }
646 
647     @Override // IOMapperBase
doIO(Reporter reporter, String name, long newLength )648     public Long doIO(Reporter reporter,
649                        String name,
650                        long newLength // in bytes
651                      ) throws IOException {
652       boolean isClosed = fs.truncate(filePath, newLength);
653       reporter.setStatus("truncating " + name + " to newLength " +
654           newLength  + " ::host = " + hostName);
655       for(int i = 0; !isClosed; i++) {
656         try {
657           Thread.sleep(DELAY);
658         } catch (InterruptedException ignored) {}
659         FileStatus status = fs.getFileStatus(filePath);
660         assert status != null : "status is null";
661         isClosed = (status.getLen() == newLength);
662         reporter.setStatus("truncate recover for " + name + " to newLength " +
663             newLength + " attempt " + i + " ::host = " + hostName);
664       }
665       return Long.valueOf(fileSize - newLength);
666     }
667   }
668 
truncateTest(FileSystem fs)669   private void truncateTest(FileSystem fs) throws IOException {
670     Path TruncateDir = getTruncateDir(config);
671     fs.delete(TruncateDir, true);
672     runIOTest(TruncateMapper.class, TruncateDir);
673   }
674 
sequentialTest(FileSystem fs, TestType testType, long fileSize, int nrFiles )675   private void sequentialTest(FileSystem fs,
676                               TestType testType,
677                               long fileSize, // in bytes
678                               int nrFiles
679                              ) throws IOException {
680     IOStatMapper ioer = null;
681     switch(testType) {
682     case TEST_TYPE_READ:
683       ioer = new ReadMapper();
684       break;
685     case TEST_TYPE_WRITE:
686       ioer = new WriteMapper();
687       break;
688     case TEST_TYPE_APPEND:
689       ioer = new AppendMapper();
690       break;
691     case TEST_TYPE_READ_RANDOM:
692     case TEST_TYPE_READ_BACKWARD:
693     case TEST_TYPE_READ_SKIP:
694       ioer = new RandomReadMapper();
695       break;
696     case TEST_TYPE_TRUNCATE:
697       ioer = new TruncateMapper();
698       break;
699     default:
700       return;
701     }
702     for(int i=0; i < nrFiles; i++)
703       ioer.doIO(Reporter.NULL,
704                 BASE_FILE_NAME+Integer.toString(i),
705                 fileSize);
706   }
707 
main(String[] args)708   public static void main(String[] args) {
709     TestDFSIO bench = new TestDFSIO();
710     int res = -1;
711     try {
712       res = ToolRunner.run(bench, args);
713     } catch(Exception e) {
714       System.err.print(StringUtils.stringifyException(e));
715       res = -2;
716     }
717     if(res == -1)
718       System.err.print(USAGE);
719     System.exit(res);
720   }
721 
722   @Override // Tool
run(String[] args)723   public int run(String[] args) throws IOException {
724     TestType testType = null;
725     int bufferSize = DEFAULT_BUFFER_SIZE;
726     long nrBytes = 1*MEGA;
727     int nrFiles = 1;
728     long skipSize = 0;
729     String resFileName = DEFAULT_RES_FILE_NAME;
730     String compressionClass = null;
731     boolean isSequential = false;
732     String version = TestDFSIO.class.getSimpleName() + ".1.8";
733 
734     LOG.info(version);
735     if (args.length == 0) {
736       System.err.println("Missing arguments.");
737       return -1;
738     }
739 
740     for (int i = 0; i < args.length; i++) {       // parse command line
741       if (args[i].startsWith("-read")) {
742         testType = TestType.TEST_TYPE_READ;
743       } else if (args[i].equals("-write")) {
744         testType = TestType.TEST_TYPE_WRITE;
745       } else if (args[i].equals("-append")) {
746         testType = TestType.TEST_TYPE_APPEND;
747       } else if (args[i].equals("-random")) {
748         if(testType != TestType.TEST_TYPE_READ) return -1;
749         testType = TestType.TEST_TYPE_READ_RANDOM;
750       } else if (args[i].equals("-backward")) {
751         if(testType != TestType.TEST_TYPE_READ) return -1;
752         testType = TestType.TEST_TYPE_READ_BACKWARD;
753       } else if (args[i].equals("-skip")) {
754         if(testType != TestType.TEST_TYPE_READ) return -1;
755         testType = TestType.TEST_TYPE_READ_SKIP;
756       } else if (args[i].equalsIgnoreCase("-truncate")) {
757         testType = TestType.TEST_TYPE_TRUNCATE;
758       } else if (args[i].equals("-clean")) {
759         testType = TestType.TEST_TYPE_CLEANUP;
760       } else if (args[i].startsWith("-seq")) {
761         isSequential = true;
762       } else if (args[i].startsWith("-compression")) {
763         compressionClass = args[++i];
764       } else if (args[i].equals("-nrFiles")) {
765         nrFiles = Integer.parseInt(args[++i]);
766       } else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
767         nrBytes = parseSize(args[++i]);
768       } else if (args[i].equals("-skipSize")) {
769         skipSize = parseSize(args[++i]);
770       } else if (args[i].equals("-bufferSize")) {
771         bufferSize = Integer.parseInt(args[++i]);
772       } else if (args[i].equals("-resFile")) {
773         resFileName = args[++i];
774       } else {
775         System.err.println("Illegal argument: " + args[i]);
776         return -1;
777       }
778     }
779     if(testType == null)
780       return -1;
781     if(testType == TestType.TEST_TYPE_READ_BACKWARD)
782       skipSize = -bufferSize;
783     else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
784       skipSize = bufferSize;
785 
786     LOG.info("nrFiles = " + nrFiles);
787     LOG.info("nrBytes (MB) = " + toMB(nrBytes));
788     LOG.info("bufferSize = " + bufferSize);
789     if(skipSize > 0)
790       LOG.info("skipSize = " + skipSize);
791     LOG.info("baseDir = " + getBaseDir(config));
792 
793     if(compressionClass != null) {
794       config.set("test.io.compression.class", compressionClass);
795       LOG.info("compressionClass = " + compressionClass);
796     }
797 
798     config.setInt("test.io.file.buffer.size", bufferSize);
799     config.setLong("test.io.skip.size", skipSize);
800     config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
801     FileSystem fs = FileSystem.get(config);
802 
803     if (isSequential) {
804       long tStart = System.currentTimeMillis();
805       sequentialTest(fs, testType, nrBytes, nrFiles);
806       long execTime = System.currentTimeMillis() - tStart;
807       String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
808       LOG.info(resultLine);
809       return 0;
810     }
811     if (testType == TestType.TEST_TYPE_CLEANUP) {
812       cleanup(fs);
813       return 0;
814     }
815     createControlFile(fs, nrBytes, nrFiles);
816     long tStart = System.currentTimeMillis();
817     switch(testType) {
818     case TEST_TYPE_WRITE:
819       writeTest(fs);
820       break;
821     case TEST_TYPE_READ:
822       readTest(fs);
823       break;
824     case TEST_TYPE_APPEND:
825       appendTest(fs);
826       break;
827     case TEST_TYPE_READ_RANDOM:
828     case TEST_TYPE_READ_BACKWARD:
829     case TEST_TYPE_READ_SKIP:
830       randomReadTest(fs);
831       break;
832     case TEST_TYPE_TRUNCATE:
833       truncateTest(fs);
834       break;
835    default:
836     }
837     long execTime = System.currentTimeMillis() - tStart;
838 
839     analyzeResult(fs, testType, execTime, resFileName);
840     return 0;
841   }
842 
843   @Override // Configurable
getConf()844   public Configuration getConf() {
845     return this.config;
846   }
847 
848   @Override // Configurable
setConf(Configuration conf)849   public void setConf(Configuration conf) {
850     this.config = conf;
851   }
852 
853   /**
854    * Returns size in bytes.
855    *
856    * @param arg = {d}[B|KB|MB|GB|TB]
857    * @return
858    */
parseSize(String arg)859   static long parseSize(String arg) {
860     String[] args = arg.split("\\D", 2);  // get digits
861     assert args.length <= 2;
862     long nrBytes = Long.parseLong(args[0]);
863     String bytesMult = arg.substring(args[0].length()); // get byte multiple
864     return nrBytes * ByteMultiple.parseString(bytesMult).value();
865   }
866 
toMB(long bytes)867   static float toMB(long bytes) {
868     return ((float)bytes)/MEGA;
869   }
870 
analyzeResult( FileSystem fs, TestType testType, long execTime, String resFileName )871   private void analyzeResult( FileSystem fs,
872                               TestType testType,
873                               long execTime,
874                               String resFileName
875                             ) throws IOException {
876     Path reduceFile = getReduceFilePath(testType);
877     long tasks = 0;
878     long size = 0;
879     long time = 0;
880     float rate = 0;
881     float sqrate = 0;
882     DataInputStream in = null;
883     BufferedReader lines = null;
884     try {
885       in = new DataInputStream(fs.open(reduceFile));
886       lines = new BufferedReader(new InputStreamReader(in));
887       String line;
888       while((line = lines.readLine()) != null) {
889         StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
890         String attr = tokens.nextToken();
891         if (attr.endsWith(":tasks"))
892           tasks = Long.parseLong(tokens.nextToken());
893         else if (attr.endsWith(":size"))
894           size = Long.parseLong(tokens.nextToken());
895         else if (attr.endsWith(":time"))
896           time = Long.parseLong(tokens.nextToken());
897         else if (attr.endsWith(":rate"))
898           rate = Float.parseFloat(tokens.nextToken());
899         else if (attr.endsWith(":sqrate"))
900           sqrate = Float.parseFloat(tokens.nextToken());
901       }
902     } finally {
903       if(in != null) in.close();
904       if(lines != null) lines.close();
905     }
906 
907     double med = rate / 1000 / tasks;
908     double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
909     String resultLines[] = {
910       "----- TestDFSIO ----- : " + testType,
911       "           Date & time: " + new Date(System.currentTimeMillis()),
912       "       Number of files: " + tasks,
913       "Total MBytes processed: " + toMB(size),
914       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
915       "Average IO rate mb/sec: " + med,
916       " IO rate std deviation: " + stdDev,
917       "    Test exec time sec: " + (float)execTime / 1000,
918       "" };
919 
920     PrintStream res = null;
921     try {
922       res = new PrintStream(new FileOutputStream(new File(resFileName), true));
923       for(int i = 0; i < resultLines.length; i++) {
924         LOG.info(resultLines[i]);
925         res.println(resultLines[i]);
926       }
927     } finally {
928       if(res != null) res.close();
929     }
930   }
931 
getReduceFilePath(TestType testType)932   private Path getReduceFilePath(TestType testType) {
933     switch(testType) {
934     case TEST_TYPE_WRITE:
935       return new Path(getWriteDir(config), "part-00000");
936     case TEST_TYPE_APPEND:
937       return new Path(getAppendDir(config), "part-00000");
938     case TEST_TYPE_READ:
939       return new Path(getReadDir(config), "part-00000");
940     case TEST_TYPE_READ_RANDOM:
941     case TEST_TYPE_READ_BACKWARD:
942     case TEST_TYPE_READ_SKIP:
943       return new Path(getRandomReadDir(config), "part-00000");
944     case TEST_TYPE_TRUNCATE:
945       return new Path(getTruncateDir(config), "part-00000");
946     default:
947     }
948     return null;
949   }
950 
analyzeResult(FileSystem fs, TestType testType, long execTime)951   private void analyzeResult(FileSystem fs, TestType testType, long execTime)
952       throws IOException {
953     String dir = System.getProperty("test.build.dir", "target/test-dir");
954     analyzeResult(fs, testType, execTime, dir + "/" + DEFAULT_RES_FILE_NAME);
955   }
956 
cleanup(FileSystem fs)957   private void cleanup(FileSystem fs)
958   throws IOException {
959     LOG.info("Cleaning up test files");
960     fs.delete(new Path(getBaseDir(config)), true);
961   }
962 }
963