1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.IOException; 22 23 import junit.extensions.TestSetup; 24 import junit.framework.Test; 25 import junit.framework.TestCase; 26 import junit.framework.TestSuite; 27 28 import org.apache.hadoop.conf.Configuration; 29 import org.apache.hadoop.hdfs.MiniDFSCluster; 30 import org.apache.hadoop.io.BytesWritable; 31 import org.apache.hadoop.io.Text; 32 import org.apache.hadoop.mapred.lib.NullOutputFormat; 33 import org.apache.hadoop.fs.FileSystem; 34 import org.apache.hadoop.fs.Path; 35 import org.apache.hadoop.util.ToolRunner; 36 import org.apache.hadoop.examples.RandomWriter; 37 import org.apache.hadoop.examples.Sort; 38 39 /** 40 * A JUnit test to test the Map-Reduce framework's sort 41 * with a Mini Map-Reduce Cluster with a Mini HDFS Clusters. 42 */ 43 public class TestMiniMRDFSSort extends TestCase { 44 // Input/Output paths for sort 45 private static final Path SORT_INPUT_PATH = new Path("/sort/input"); 46 private static final Path SORT_OUTPUT_PATH = new Path("/sort/output"); 47 48 // Knobs to control randomwriter; and hence sort 49 private static final int NUM_HADOOP_SLAVES = 3; 50 // make it big enough to cause a spill in the map 51 private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024; 52 private static final int RW_MAPS_PER_HOST = 2; 53 54 private static MiniMRCluster mrCluster = null; 55 private static MiniDFSCluster dfsCluster = null; 56 private static FileSystem dfs = null; suite()57 public static Test suite() { 58 TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) { 59 protected void setUp() throws Exception { 60 Configuration conf = new Configuration(); 61 dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null); 62 dfs = dfsCluster.getFileSystem(); 63 mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES, 64 dfs.getUri().toString(), 1); 65 } 66 protected void tearDown() throws Exception { 67 if (dfsCluster != null) { dfsCluster.shutdown(); } 68 if (mrCluster != null) { mrCluster.shutdown(); } 69 } 70 }; 71 return setup; 72 } 73 runRandomWriter(JobConf job, Path sortInput)74 public static void runRandomWriter(JobConf job, Path sortInput) 75 throws Exception { 76 // Scale down the default settings for RandomWriter for the test-case 77 // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP 78 job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP); 79 job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST); 80 String[] rwArgs = {sortInput.toString()}; 81 82 // Run RandomWriter 83 assertEquals(ToolRunner.run(job, new RandomWriter(), rwArgs), 0); 84 } 85 runSort(JobConf job, Path sortInput, Path sortOutput)86 private static void runSort(JobConf job, Path sortInput, Path sortOutput) 87 throws Exception { 88 89 job.setInt("mapred.job.reuse.jvm.num.tasks", -1); 90 job.setInt("io.sort.mb", 1); 91 job.setNumMapTasks(12); 92 93 // Setup command-line arguments to 'sort' 94 String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; 95 96 // Run Sort 97 Sort sort = new Sort(); 98 assertEquals(ToolRunner.run(job, sort, sortArgs), 0); 99 Counters counters = sort.getResult().getCounters(); 100 long mapInput = counters.findCounter(Task.Counter.MAP_INPUT_BYTES 101 ).getValue(); 102 long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 103 "HDFS_BYTES_READ").getValue(); 104 // the hdfs read should be between 100% and 110% of the map input bytes 105 assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead, 106 (hdfsRead < (mapInput * 1.1)) && 107 (hdfsRead > mapInput)); 108 } 109 runSortValidator(JobConf job, Path sortInput, Path sortOutput)110 private static void runSortValidator(JobConf job, 111 Path sortInput, Path sortOutput) 112 throws Exception { 113 String[] svArgs = {"-sortInput", sortInput.toString(), 114 "-sortOutput", sortOutput.toString()}; 115 116 // Run Sort-Validator 117 assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0); 118 } 119 120 private static class ReuseDetector extends MapReduceBase 121 implements Mapper<BytesWritable,BytesWritable, Text, Text> { 122 static int instances = 0; 123 Reporter reporter = null; 124 125 @Override map(BytesWritable key, BytesWritable value, OutputCollector<Text, Text> output, Reporter reporter)126 public void map(BytesWritable key, BytesWritable value, 127 OutputCollector<Text, Text> output, 128 Reporter reporter) throws IOException { 129 this.reporter = reporter; 130 } 131 close()132 public void close() throws IOException { 133 reporter.incrCounter("jvm", "use", ++instances); 134 } 135 } 136 runJvmReuseTest(JobConf job, boolean reuse)137 private static void runJvmReuseTest(JobConf job, 138 boolean reuse) throws IOException { 139 // setup a map-only job that reads the input and only sets the counters 140 // based on how many times the jvm was reused. 141 job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1); 142 FileInputFormat.setInputPaths(job, SORT_INPUT_PATH); 143 job.setInputFormat(SequenceFileInputFormat.class); 144 job.setOutputFormat(NullOutputFormat.class); 145 job.setMapperClass(ReuseDetector.class); 146 job.setOutputKeyClass(Text.class); 147 job.setOutputValueClass(Text.class); 148 job.setNumMapTasks(24); 149 job.setNumReduceTasks(0); 150 RunningJob result = JobClient.runJob(job); 151 long uses = result.getCounters().findCounter("jvm", "use").getValue(); 152 int maps = job.getNumMapTasks(); 153 if (reuse) { 154 assertTrue("maps = " + maps + ", uses = " + uses, maps < uses); 155 } else { 156 assertEquals("uses should be number of maps", job.getNumMapTasks(), uses); 157 } 158 } 159 testMapReduceSort()160 public void testMapReduceSort() throws Exception { 161 // Run randomwriter to generate input for 'sort' 162 runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH); 163 164 // Run sort 165 runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); 166 167 // Run sort-validator to check if sort worked correctly 168 runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH, 169 SORT_OUTPUT_PATH); 170 // test JVM reuse 171 runJvmReuseTest(mrCluster.createJobConf(), true); 172 173 // test no JVM reuse 174 runJvmReuseTest(mrCluster.createJobConf(), false); 175 } 176 } 177