1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.IOException;
22 
23 import junit.extensions.TestSetup;
24 import junit.framework.Test;
25 import junit.framework.TestCase;
26 import junit.framework.TestSuite;
27 
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hdfs.MiniDFSCluster;
30 import org.apache.hadoop.io.BytesWritable;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.mapred.lib.NullOutputFormat;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.util.ToolRunner;
36 import org.apache.hadoop.examples.RandomWriter;
37 import org.apache.hadoop.examples.Sort;
38 
39 /**
40  * A JUnit test to test the Map-Reduce framework's sort
41  * with a Mini Map-Reduce Cluster with a Mini HDFS Clusters.
42  */
43 public class TestMiniMRDFSSort extends TestCase {
44   // Input/Output paths for sort
45   private static final Path SORT_INPUT_PATH = new Path("/sort/input");
46   private static final Path SORT_OUTPUT_PATH = new Path("/sort/output");
47 
48   // Knobs to control randomwriter; and hence sort
49   private static final int NUM_HADOOP_SLAVES = 3;
50   // make it big enough to cause a spill in the map
51   private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
52   private static final int RW_MAPS_PER_HOST = 2;
53 
54   private static MiniMRCluster mrCluster = null;
55   private static MiniDFSCluster dfsCluster = null;
56   private static FileSystem dfs = null;
suite()57   public static Test suite() {
58     TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) {
59       protected void setUp() throws Exception {
60         Configuration conf = new Configuration();
61         dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
62         dfs = dfsCluster.getFileSystem();
63         mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES,
64                                       dfs.getUri().toString(), 1);
65       }
66       protected void tearDown() throws Exception {
67         if (dfsCluster != null) { dfsCluster.shutdown(); }
68         if (mrCluster != null) { mrCluster.shutdown(); }
69       }
70     };
71     return setup;
72   }
73 
runRandomWriter(JobConf job, Path sortInput)74   public static void runRandomWriter(JobConf job, Path sortInput)
75   throws Exception {
76     // Scale down the default settings for RandomWriter for the test-case
77     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
78     job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
79     job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
80     String[] rwArgs = {sortInput.toString()};
81 
82     // Run RandomWriter
83     assertEquals(ToolRunner.run(job, new RandomWriter(), rwArgs), 0);
84   }
85 
runSort(JobConf job, Path sortInput, Path sortOutput)86   private static void runSort(JobConf job, Path sortInput, Path sortOutput)
87   throws Exception {
88 
89     job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
90     job.setInt("io.sort.mb", 1);
91     job.setNumMapTasks(12);
92 
93     // Setup command-line arguments to 'sort'
94     String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
95 
96     // Run Sort
97     Sort sort = new Sort();
98     assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
99     Counters counters = sort.getResult().getCounters();
100     long mapInput = counters.findCounter(Task.Counter.MAP_INPUT_BYTES
101     ).getValue();
102     long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
103                                          "HDFS_BYTES_READ").getValue();
104     // the hdfs read should be between 100% and 110% of the map input bytes
105     assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
106                (hdfsRead < (mapInput * 1.1)) &&
107                (hdfsRead > mapInput));
108   }
109 
runSortValidator(JobConf job, Path sortInput, Path sortOutput)110   private static void runSortValidator(JobConf job,
111                                        Path sortInput, Path sortOutput)
112   throws Exception {
113     String[] svArgs = {"-sortInput", sortInput.toString(),
114                        "-sortOutput", sortOutput.toString()};
115 
116     // Run Sort-Validator
117     assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
118   }
119 
120   private static class ReuseDetector extends MapReduceBase
121       implements Mapper<BytesWritable,BytesWritable, Text, Text> {
122     static int instances = 0;
123     Reporter reporter = null;
124 
125     @Override
map(BytesWritable key, BytesWritable value, OutputCollector<Text, Text> output, Reporter reporter)126     public void map(BytesWritable key, BytesWritable value,
127                     OutputCollector<Text, Text> output,
128                     Reporter reporter) throws IOException {
129       this.reporter = reporter;
130     }
131 
close()132     public void close() throws IOException {
133       reporter.incrCounter("jvm", "use", ++instances);
134     }
135   }
136 
runJvmReuseTest(JobConf job, boolean reuse)137   private static void runJvmReuseTest(JobConf job,
138                                       boolean reuse) throws IOException {
139     // setup a map-only job that reads the input and only sets the counters
140     // based on how many times the jvm was reused.
141     job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
142     FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
143     job.setInputFormat(SequenceFileInputFormat.class);
144     job.setOutputFormat(NullOutputFormat.class);
145     job.setMapperClass(ReuseDetector.class);
146     job.setOutputKeyClass(Text.class);
147     job.setOutputValueClass(Text.class);
148     job.setNumMapTasks(24);
149     job.setNumReduceTasks(0);
150     RunningJob result = JobClient.runJob(job);
151     long uses = result.getCounters().findCounter("jvm", "use").getValue();
152     int maps = job.getNumMapTasks();
153     if (reuse) {
154       assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
155     } else {
156       assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
157     }
158   }
159 
testMapReduceSort()160   public void testMapReduceSort() throws Exception {
161     // Run randomwriter to generate input for 'sort'
162     runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH);
163 
164     // Run sort
165     runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
166 
167     // Run sort-validator to check if sort worked correctly
168     runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH,
169                      SORT_OUTPUT_PATH);
170     // test JVM reuse
171     runJvmReuseTest(mrCluster.createJobConf(), true);
172 
173     // test no JVM reuse
174     runJvmReuseTest(mrCluster.createJobConf(), false);
175   }
176 }
177