1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred; 19 20 import java.io.BufferedReader; 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.InputStreamReader; 24 import java.io.OutputStream; 25 import java.io.OutputStreamWriter; 26 import java.io.Writer; 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Iterator; 30 import java.util.List; 31 import java.util.StringTokenizer; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.fs.FileUtil; 36 import org.apache.hadoop.fs.Path; 37 import org.apache.hadoop.io.LongWritable; 38 import org.apache.hadoop.io.SequenceFile; 39 import org.apache.hadoop.io.Text; 40 import org.apache.hadoop.util.ReflectionUtils; 41 42 public class TestBadRecords extends ClusterMapReduceTestCase { 43 44 private static final Log LOG = 45 LogFactory.getLog(TestBadRecords.class); 46 47 private static final List<String> MAPPER_BAD_RECORDS = 48 Arrays.asList("hello01","hello04","hello05"); 49 50 private static final List<String> REDUCER_BAD_RECORDS = 51 Arrays.asList("hello08","hello10"); 52 53 private List<String> input; 54 TestBadRecords()55 public TestBadRecords() { 56 input = new ArrayList<String>(); 57 for(int i=1;i<=10;i++) { 58 String str = ""+i; 59 int zerosToPrepend = 2 - str.length(); 60 for(int j=0;j<zerosToPrepend;j++){ 61 str = "0"+str; 62 } 63 input.add("hello"+str); 64 } 65 } 66 runMapReduce(JobConf conf, List<String> mapperBadRecords, List<String> redBadRecords)67 private void runMapReduce(JobConf conf, 68 List<String> mapperBadRecords, List<String> redBadRecords) 69 throws Exception { 70 createInput(); 71 conf.setJobName("mr"); 72 conf.setNumMapTasks(1); 73 conf.setNumReduceTasks(1); 74 conf.setInt("mapred.task.timeout", 30*1000); 75 SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE); 76 SkipBadRecords.setReducerMaxSkipGroups(conf, Long.MAX_VALUE); 77 78 SkipBadRecords.setAttemptsToStartSkipping(conf,0); 79 //the no of attempts to successfully complete the task depends 80 //on the no of bad records. 81 conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+ 82 mapperBadRecords.size()); 83 conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+ 84 1+redBadRecords.size()); 85 86 FileInputFormat.setInputPaths(conf, getInputDir()); 87 FileOutputFormat.setOutputPath(conf, getOutputDir()); 88 conf.setInputFormat(TextInputFormat.class); 89 conf.setMapOutputKeyClass(LongWritable.class); 90 conf.setMapOutputValueClass(Text.class); 91 conf.setOutputFormat(TextOutputFormat.class); 92 conf.setOutputKeyClass(LongWritable.class); 93 conf.setOutputValueClass(Text.class); 94 RunningJob runningJob = JobClient.runJob(conf); 95 validateOutput(conf, runningJob, mapperBadRecords, redBadRecords); 96 } 97 98 createInput()99 private void createInput() throws Exception { 100 OutputStream os = getFileSystem().create(new Path(getInputDir(), 101 "text.txt")); 102 Writer wr = new OutputStreamWriter(os); 103 for(String inp : input) { 104 wr.write(inp+"\n"); 105 }wr.close(); 106 } 107 validateOutput(JobConf conf, RunningJob runningJob, List<String> mapperBadRecords, List<String> redBadRecords)108 private void validateOutput(JobConf conf, RunningJob runningJob, 109 List<String> mapperBadRecords, List<String> redBadRecords) 110 throws Exception{ 111 LOG.info(runningJob.getCounters().toString()); 112 assertTrue(runningJob.isSuccessful()); 113 114 //validate counters 115 Counters counters = runningJob.getCounters(); 116 assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS). 117 getCounter(),mapperBadRecords.size()); 118 119 int mapRecs = input.size() - mapperBadRecords.size(); 120 assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS). 121 getCounter(),mapRecs); 122 assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS). 123 getCounter(),mapRecs); 124 125 int redRecs = mapRecs - redBadRecords.size(); 126 assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS). 127 getCounter(),redBadRecords.size()); 128 assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS). 129 getCounter(),redBadRecords.size()); 130 assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS). 131 getCounter(),redRecs); 132 assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS). 133 getCounter(),redRecs); 134 assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS). 135 getCounter(),redRecs); 136 137 //validate skipped records 138 Path skipDir = SkipBadRecords.getSkipOutputPath(conf); 139 Path[] skips = FileUtil.stat2Paths(getFileSystem().listStatus(skipDir)); 140 List<String> mapSkipped = new ArrayList<String>(); 141 List<String> redSkipped = new ArrayList<String>(); 142 for(Path skipPath : skips) { 143 LOG.info("skipPath: " + skipPath); 144 145 SequenceFile.Reader reader = new SequenceFile.Reader( 146 getFileSystem(), skipPath, conf); 147 Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); 148 Object value = ReflectionUtils.newInstance(reader.getValueClass(), 149 conf); 150 key = reader.next(key); 151 while(key!=null) { 152 value = reader.getCurrentValue(value); 153 LOG.debug("key:"+key+" value:"+value.toString()); 154 if(skipPath.getName().contains("_r_")) { 155 redSkipped.add(value.toString()); 156 } else { 157 mapSkipped.add(value.toString()); 158 } 159 key = reader.next(key); 160 } 161 reader.close(); 162 } 163 assertTrue(mapSkipped.containsAll(mapperBadRecords)); 164 assertTrue(redSkipped.containsAll(redBadRecords)); 165 166 Path[] outputFiles = FileUtil.stat2Paths( 167 getFileSystem().listStatus(getOutputDir(), 168 new Utils.OutputFileUtils.OutputFilesFilter())); 169 170 List<String> mapperOutput=getProcessed(input, mapperBadRecords); 171 LOG.debug("mapperOutput " + mapperOutput.size()); 172 List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords); 173 LOG.debug("reducerOutput " + reducerOutput.size()); 174 175 if (outputFiles.length > 0) { 176 InputStream is = getFileSystem().open(outputFiles[0]); 177 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 178 String line = reader.readLine(); 179 int counter = 0; 180 while (line != null) { 181 counter++; 182 StringTokenizer tokeniz = new StringTokenizer(line, "\t"); 183 String key = tokeniz.nextToken(); 184 String value = tokeniz.nextToken(); 185 LOG.debug("Output: key:"+key + " value:"+value); 186 assertTrue(value.contains("hello")); 187 188 189 assertTrue(reducerOutput.contains(value)); 190 line = reader.readLine(); 191 } 192 reader.close(); 193 assertEquals(reducerOutput.size(), counter); 194 } 195 } 196 getProcessed(List<String> inputs, List<String> badRecs)197 private List<String> getProcessed(List<String> inputs, List<String> badRecs) { 198 List<String> processed = new ArrayList<String>(); 199 for(String input : inputs) { 200 if(!badRecs.contains(input)) { 201 processed.add(input); 202 } 203 } 204 return processed; 205 } 206 testBadMapRed()207 public void testBadMapRed() throws Exception { 208 JobConf conf = createJobConf(); 209 conf.setMapperClass(BadMapper.class); 210 conf.setReducerClass(BadReducer.class); 211 runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS); 212 } 213 214 215 static class BadMapper extends MapReduceBase implements 216 Mapper<LongWritable, Text, LongWritable, Text> { 217 map(LongWritable key, Text val, OutputCollector<LongWritable, Text> output, Reporter reporter)218 public void map(LongWritable key, Text val, 219 OutputCollector<LongWritable, Text> output, Reporter reporter) 220 throws IOException { 221 String str = val.toString(); 222 LOG.debug("MAP key:" +key +" value:" + str); 223 if(MAPPER_BAD_RECORDS.get(0).equals(str)) { 224 LOG.warn("MAP Encountered BAD record"); 225 System.exit(-1); 226 } 227 else if(MAPPER_BAD_RECORDS.get(1).equals(str)) { 228 LOG.warn("MAP Encountered BAD record"); 229 throw new RuntimeException("Bad record "+str); 230 } 231 else if(MAPPER_BAD_RECORDS.get(2).equals(str)) { 232 try { 233 LOG.warn("MAP Encountered BAD record"); 234 Thread.sleep(15*60*1000); 235 } catch (InterruptedException e) { 236 e.printStackTrace(); 237 } 238 } 239 output.collect(key, val); 240 } 241 } 242 243 static class BadReducer extends MapReduceBase implements 244 Reducer<LongWritable, Text, LongWritable, Text> { 245 reduce(LongWritable key, Iterator<Text> values, OutputCollector<LongWritable, Text> output, Reporter reporter)246 public void reduce(LongWritable key, Iterator<Text> values, 247 OutputCollector<LongWritable, Text> output, Reporter reporter) 248 throws IOException { 249 while(values.hasNext()) { 250 Text value = values.next(); 251 LOG.debug("REDUCE key:" +key +" value:" + value); 252 if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) { 253 LOG.warn("REDUCE Encountered BAD record"); 254 System.exit(-1); 255 } 256 else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) { 257 try { 258 LOG.warn("REDUCE Encountered BAD record"); 259 Thread.sleep(15*60*1000); 260 } catch (InterruptedException e) { 261 e.printStackTrace(); 262 } 263 } 264 output.collect(key, value); 265 } 266 267 } 268 } 269 270 271 } 272