1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred; 19 20 import java.io.*; 21 import java.util.Iterator; 22 23 import junit.framework.TestCase; 24 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.hadoop.io.IntWritable; 28 import org.apache.hadoop.io.LongWritable; 29 import org.apache.hadoop.io.Text; 30 import org.apache.hadoop.io.Writable; 31 import org.apache.hadoop.io.WritableComparable; 32 import org.apache.hadoop.fs.FileSystem; 33 import org.apache.hadoop.fs.Path; 34 import org.apache.hadoop.mapred.lib.IdentityReducer; 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.hdfs.MiniDFSCluster; 37 38 /** 39 * Class to test mapred task's 40 * - temp directory 41 * - child env 42 */ 43 public class TestMiniMRChildTask extends TestCase { 44 private static final Log LOG = 45 LogFactory.getLog(TestMiniMRChildTask.class.getName()); 46 47 private final static String OLD_CONFIGS = "test.old.configs"; 48 private final static String TASK_OPTS_VAL = "-Xmx200m"; 49 private final static String MAP_OPTS_VAL = "-Xmx200m"; 50 private final static String REDUCE_OPTS_VAL = "-Xmx300m"; 51 52 private MiniMRCluster mr; 53 private MiniDFSCluster dfs; 54 private FileSystem fileSys; 55 56 /** 57 * Map class which checks whether temp directory exists 58 * and check the value of java.io.tmpdir 59 * Creates a tempfile and checks whether that is created in 60 * temp directory specified. 61 */ 62 public static class MapClass extends MapReduceBase 63 implements Mapper<LongWritable, Text, Text, IntWritable> { 64 Path tmpDir; 65 FileSystem localFs; map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)66 public void map (LongWritable key, Text value, 67 OutputCollector<Text, IntWritable> output, 68 Reporter reporter) throws IOException { 69 String tmp = null; 70 if (localFs.exists(tmpDir)) { 71 tmp = tmpDir.makeQualified(localFs).toString(); 72 73 assertEquals(tmp, new Path(System.getProperty("java.io.tmpdir")). 74 makeQualified(localFs).toString()); 75 } else { 76 fail("Temp directory "+tmpDir +" doesnt exist."); 77 } 78 File tmpFile = File.createTempFile("test", ".tmp"); 79 assertEquals(tmp, new Path(tmpFile.getParent()). 80 makeQualified(localFs).toString()); 81 } configure(JobConf job)82 public void configure(JobConf job) { 83 tmpDir = new Path(job.get("mapred.child.tmp", "./tmp")); 84 try { 85 localFs = FileSystem.getLocal(job); 86 } catch (IOException ioe) { 87 ioe.printStackTrace(); 88 fail("IOException in getting localFS"); 89 } 90 } 91 } 92 93 // configure a job configure(JobConf conf, Path inDir, Path outDir, String input, Class<? extends Mapper> map, Class<? extends Reducer> reduce)94 private void configure(JobConf conf, Path inDir, Path outDir, String input, 95 Class<? extends Mapper> map, 96 Class<? extends Reducer> reduce) 97 throws IOException { 98 // set up the input file system and write input text. 99 FileSystem inFs = inDir.getFileSystem(conf); 100 FileSystem outFs = outDir.getFileSystem(conf); 101 outFs.delete(outDir, true); 102 if (!inFs.mkdirs(inDir)) { 103 throw new IOException("Mkdirs failed to create " + inDir.toString()); 104 } 105 { 106 // write input into input file 107 DataOutputStream file = inFs.create(new Path(inDir, "part-0")); 108 file.writeBytes(input); 109 file.close(); 110 } 111 112 // configure the mapred Job which creates a tempfile in map. 113 conf.setJobName("testmap"); 114 conf.setMapperClass(map); 115 conf.setReducerClass(reduce); 116 conf.setNumMapTasks(1); 117 conf.setNumReduceTasks(0); 118 FileInputFormat.setInputPaths(conf, inDir); 119 FileOutputFormat.setOutputPath(conf, outDir); 120 String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", 121 "/tmp")).toString().replace(' ', '+'); 122 conf.set("test.build.data", TEST_ROOT_DIR); 123 } 124 125 /** 126 * Launch tests 127 * @param conf Configuration of the mapreduce job. 128 * @param inDir input path 129 * @param outDir output path 130 * @param input Input text 131 * @throws IOException 132 */ launchTest(JobConf conf, Path inDir, Path outDir, String input)133 public void launchTest(JobConf conf, 134 Path inDir, 135 Path outDir, 136 String input) 137 throws IOException { 138 configure(conf, inDir, outDir, input, 139 MapClass.class, IdentityReducer.class); 140 141 FileSystem outFs = outDir.getFileSystem(conf); 142 143 // Launch job with default option for temp dir. 144 // i.e. temp dir is ./tmp 145 JobClient.runJob(conf); 146 outFs.delete(outDir, true); 147 148 // Launch job by giving relative path to temp dir. 149 conf.set("mapred.child.tmp", "../temp"); 150 JobClient.runJob(conf); 151 outFs.delete(outDir, true); 152 153 // Launch job by giving absolute path to temp dir 154 conf.set("mapred.child.tmp", "/tmp"); 155 JobClient.runJob(conf); 156 outFs.delete(outDir, true); 157 } 158 checkEnv(String envName, String expValue, String mode)159 private static void checkEnv(String envName, String expValue, String mode) { 160 String envValue = System.getenv(envName).trim(); 161 if ("append".equals(mode)) { 162 if (envValue == null || !envValue.contains(":")) { 163 throw new RuntimeException("Missing env variable"); 164 } else { 165 String parts[] = envValue.split(":"); 166 // check if the value is appended 167 if (!parts[parts.length - 1].equals(expValue)) { 168 throw new RuntimeException("Wrong env variable in append mode"); 169 } 170 } 171 } else { 172 if (envValue == null || !envValue.equals(expValue)) { 173 throw new RuntimeException("Wrong env variable in noappend mode"); 174 } 175 } 176 } 177 178 // Mappers that simply checks if the desired user env are present or not 179 static class EnvCheckMapper extends MapReduceBase implements 180 Mapper<WritableComparable, Writable, WritableComparable, Writable> { 181 configure(JobConf job)182 public void configure(JobConf job) { 183 boolean oldConfigs = job.getBoolean(OLD_CONFIGS, false); 184 if (oldConfigs) { 185 String javaOpts = job.get(JobConf.MAPRED_TASK_JAVA_OPTS); 186 assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", 187 javaOpts); 188 assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + 189 javaOpts, 190 javaOpts, TASK_OPTS_VAL); 191 } else { 192 String mapJavaOpts = job.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS); 193 assertNotNull(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " is null!", 194 mapJavaOpts); 195 assertEquals(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " has value of: " + 196 mapJavaOpts, 197 mapJavaOpts, MAP_OPTS_VAL); 198 } 199 200 String path = job.get("path"); 201 202 // check if the pwd is there in LD_LIBRARY_PATH 203 String pwd = System.getenv("PWD"); 204 205 assertTrue("LD doesnt contain pwd", 206 System.getenv("LD_LIBRARY_PATH").contains(pwd)); 207 208 // check if X=$X:/abc works for LD_LIBRARY_PATH 209 checkEnv("LD_LIBRARY_PATH", "/tmp", "append"); 210 // check if X=/tmp works for an already existing parameter 211 checkEnv("HOME", "/tmp", "noappend"); 212 // check if X=/tmp for a new env variable 213 checkEnv("MY_PATH", "/tmp", "noappend"); 214 // check if X=$X:/tmp works for a new env var and results into :/tmp 215 checkEnv("NEW_PATH", ":/tmp", "noappend"); 216 // check if X=$(tt's X var):/tmp for an old env variable inherited from 217 // the tt 218 checkEnv("PATH", path + ":/tmp", "noappend"); 219 } 220 map(WritableComparable key, Writable value, OutputCollector<WritableComparable, Writable> out, Reporter reporter)221 public void map(WritableComparable key, Writable value, 222 OutputCollector<WritableComparable, Writable> out, 223 Reporter reporter) 224 throws IOException { 225 } 226 } 227 228 static class EnvCheckReducer extends MapReduceBase 229 implements Reducer<WritableComparable, Writable, WritableComparable, Writable> { 230 231 @Override configure(JobConf job)232 public void configure(JobConf job) { 233 boolean oldConfigs = job.getBoolean(OLD_CONFIGS, false); 234 if (oldConfigs) { 235 String javaOpts = job.get(JobConf.MAPRED_TASK_JAVA_OPTS); 236 assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", 237 javaOpts); 238 assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + 239 javaOpts, 240 javaOpts, TASK_OPTS_VAL); 241 } else { 242 String reduceJavaOpts = job.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS); 243 assertNotNull(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " is null!", 244 reduceJavaOpts); 245 assertEquals(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " has value of: " + 246 reduceJavaOpts, 247 reduceJavaOpts, REDUCE_OPTS_VAL); 248 } 249 250 String path = job.get("path"); 251 252 // check if the pwd is there in LD_LIBRARY_PATH 253 String pwd = System.getenv("PWD"); 254 255 assertTrue("LD doesnt contain pwd", 256 System.getenv("LD_LIBRARY_PATH").contains(pwd)); 257 258 // check if X=$X:/abc works for LD_LIBRARY_PATH 259 checkEnv("LD_LIBRARY_PATH", "/tmp", "append"); 260 // check if X=/tmp works for an already existing parameter 261 checkEnv("HOME", "/tmp", "noappend"); 262 // check if X=/tmp for a new env variable 263 checkEnv("MY_PATH", "/tmp", "noappend"); 264 // check if X=$X:/tmp works for a new env var and results into :/tmp 265 checkEnv("NEW_PATH", ":/tmp", "noappend"); 266 // check if X=$(tt's X var):/tmp for an old env variable inherited from 267 // the tt 268 checkEnv("PATH", path + ":/tmp", "noappend"); 269 270 } 271 272 @Override reduce(WritableComparable key, Iterator<Writable> values, OutputCollector<WritableComparable, Writable> output, Reporter reporter)273 public void reduce(WritableComparable key, Iterator<Writable> values, 274 OutputCollector<WritableComparable, Writable> output, 275 Reporter reporter) 276 throws IOException { 277 } 278 279 } 280 281 @Override setUp()282 public void setUp() { 283 try { 284 // create configuration, dfs, file system and mapred cluster 285 dfs = new MiniDFSCluster(new Configuration(), 1, true, null); 286 fileSys = dfs.getFileSystem(); 287 mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1); 288 } catch (IOException ioe) { 289 tearDown(); 290 } 291 } 292 293 @Override tearDown()294 public void tearDown() { 295 // close file system and shut down dfs and mapred cluster 296 try { 297 if (fileSys != null) { 298 fileSys.close(); 299 } 300 if (dfs != null) { 301 dfs.shutdown(); 302 } 303 if (mr != null) { 304 mr.shutdown(); 305 } 306 } catch (IOException ioe) { 307 LOG.info("IO exception in closing file system)" ); 308 ioe.printStackTrace(); 309 } 310 } 311 312 /** 313 * Tests task's temp directory. 314 * 315 * In this test, we give different values to mapred.child.tmp 316 * both relative and absolute. And check whether the temp directory 317 * is created. We also check whether java.io.tmpdir value is same as 318 * the directory specified. We create a temp file and check if is is 319 * created in the directory specified. 320 */ testTaskTempDir()321 public void testTaskTempDir(){ 322 try { 323 JobConf conf = mr.createJobConf(); 324 325 // intialize input, output directories 326 Path inDir = new Path("testing/wc/input"); 327 Path outDir = new Path("testing/wc/output"); 328 String input = "The input"; 329 330 launchTest(conf, inDir, outDir, input); 331 332 } catch(Exception e) { 333 e.printStackTrace(); 334 fail("Exception in testing temp dir"); 335 tearDown(); 336 } 337 } 338 339 /** 340 * Test to test if the user set env variables reflect in the child 341 * processes. Mainly 342 * - x=y (x can be a already existing env variable or a new variable) 343 * - x=$x:y (replace $x with the current value of x) 344 */ testTaskEnv()345 public void testTaskEnv(){ 346 try { 347 JobConf conf = mr.createJobConf(); 348 // initialize input, output directories 349 Path inDir = new Path("testing/wc/input1"); 350 Path outDir = new Path("testing/wc/output1"); 351 FileSystem outFs = outDir.getFileSystem(conf); 352 runTestTaskEnv(conf, inDir, outDir, false); 353 outFs.delete(outDir, true); 354 } catch(Exception e) { 355 e.printStackTrace(); 356 fail("Exception in testing child env"); 357 tearDown(); 358 } 359 } 360 361 /** 362 * Test to test if the user set *old* env variables reflect in the child 363 * processes. Mainly 364 * - x=y (x can be a already existing env variable or a new variable) 365 * - x=$x:y (replace $x with the current value of x) 366 */ testTaskOldEnv()367 public void testTaskOldEnv(){ 368 try { 369 JobConf conf = mr.createJobConf(); 370 // initialize input, output directories 371 Path inDir = new Path("testing/wc/input1"); 372 Path outDir = new Path("testing/wc/output1"); 373 FileSystem outFs = outDir.getFileSystem(conf); 374 runTestTaskEnv(conf, inDir, outDir, true); 375 outFs.delete(outDir, true); 376 } catch(Exception e) { 377 e.printStackTrace(); 378 fail("Exception in testing child env"); 379 tearDown(); 380 } 381 } 382 runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs)383 void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs) 384 throws IOException { 385 String input = "The input"; 386 configure(conf, inDir, outDir, input, EnvCheckMapper.class, 387 EnvCheckReducer.class); 388 // test 389 // - new SET of new var (MY_PATH) 390 // - set of old var (HOME) 391 // - append to an old var from modified env (LD_LIBRARY_PATH) 392 // - append to an old var from tt's env (PATH) 393 // - append to a new var (NEW_PATH) 394 String mapTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV; 395 String reduceTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV; 396 String mapTaskJavaOptsKey = JobConf.MAPRED_MAP_TASK_JAVA_OPTS; 397 String reduceTaskJavaOptsKey = JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS; 398 String mapTaskJavaOpts = MAP_OPTS_VAL; 399 String reduceTaskJavaOpts = REDUCE_OPTS_VAL; 400 conf.setBoolean(OLD_CONFIGS, oldConfigs); 401 if (oldConfigs) { 402 mapTaskEnvKey = reduceTaskEnvKey = JobConf.MAPRED_TASK_ENV; 403 mapTaskJavaOptsKey = reduceTaskJavaOptsKey = JobConf.MAPRED_TASK_JAVA_OPTS; 404 mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL; 405 } 406 conf.set(mapTaskEnvKey, 407 "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," + 408 "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp"); 409 conf.set(reduceTaskEnvKey, 410 "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," + 411 "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp"); 412 413 conf.set("path", System.getenv("PATH")); 414 415 conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts); 416 conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts); 417 418 RunningJob job = JobClient.runJob(conf); 419 assertTrue("The environment checker job failed.", job.isSuccessful()); 420 } 421 422 } 423