1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapred;
19 
20 import java.io.BufferedReader;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InputStreamReader;
24 import java.io.OutputStream;
25 import java.io.OutputStreamWriter;
26 import java.io.Writer;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.StringTokenizer;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.io.LongWritable;
38 import org.apache.hadoop.io.SequenceFile;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.util.ReflectionUtils;
41 
42 public class TestBadRecords extends ClusterMapReduceTestCase {
43 
44   private static final Log LOG =
45     LogFactory.getLog(TestBadRecords.class);
46 
47   private static final List<String> MAPPER_BAD_RECORDS =
48     Arrays.asList("hello01","hello04","hello05");
49 
50   private static final List<String> REDUCER_BAD_RECORDS =
51     Arrays.asList("hello08","hello10");
52 
53   private List<String> input;
54 
TestBadRecords()55   public TestBadRecords() {
56     input = new ArrayList<String>();
57     for(int i=1;i<=10;i++) {
58       String str = ""+i;
59       int zerosToPrepend = 2 - str.length();
60       for(int j=0;j<zerosToPrepend;j++){
61         str = "0"+str;
62       }
63       input.add("hello"+str);
64     }
65   }
66 
runMapReduce(JobConf conf, List<String> mapperBadRecords, List<String> redBadRecords)67   private void runMapReduce(JobConf conf,
68       List<String> mapperBadRecords, List<String> redBadRecords)
69         throws Exception {
70     createInput();
71     conf.setJobName("mr");
72     conf.setNumMapTasks(1);
73     conf.setNumReduceTasks(1);
74     conf.setInt("mapred.task.timeout", 30*1000);
75     SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE);
76     SkipBadRecords.setReducerMaxSkipGroups(conf, Long.MAX_VALUE);
77 
78     SkipBadRecords.setAttemptsToStartSkipping(conf,0);
79     //the no of attempts to successfully complete the task depends
80     //on the no of bad records.
81     conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
82         mapperBadRecords.size());
83     conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+
84         1+redBadRecords.size());
85 
86     FileInputFormat.setInputPaths(conf, getInputDir());
87     FileOutputFormat.setOutputPath(conf, getOutputDir());
88     conf.setInputFormat(TextInputFormat.class);
89     conf.setMapOutputKeyClass(LongWritable.class);
90     conf.setMapOutputValueClass(Text.class);
91     conf.setOutputFormat(TextOutputFormat.class);
92     conf.setOutputKeyClass(LongWritable.class);
93     conf.setOutputValueClass(Text.class);
94     RunningJob runningJob = JobClient.runJob(conf);
95     validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
96   }
97 
98 
createInput()99   private void createInput() throws Exception {
100     OutputStream os = getFileSystem().create(new Path(getInputDir(),
101         "text.txt"));
102     Writer wr = new OutputStreamWriter(os);
103     for(String inp : input) {
104       wr.write(inp+"\n");
105     }wr.close();
106   }
107 
validateOutput(JobConf conf, RunningJob runningJob, List<String> mapperBadRecords, List<String> redBadRecords)108   private void validateOutput(JobConf conf, RunningJob runningJob,
109       List<String> mapperBadRecords, List<String> redBadRecords)
110     throws Exception{
111     LOG.info(runningJob.getCounters().toString());
112     assertTrue(runningJob.isSuccessful());
113 
114     //validate counters
115     Counters counters = runningJob.getCounters();
116     assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS).
117         getCounter(),mapperBadRecords.size());
118 
119     int mapRecs = input.size() - mapperBadRecords.size();
120     assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).
121         getCounter(),mapRecs);
122     assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).
123         getCounter(),mapRecs);
124 
125     int redRecs = mapRecs - redBadRecords.size();
126     assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS).
127         getCounter(),redBadRecords.size());
128     assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS).
129         getCounter(),redBadRecords.size());
130     assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS).
131         getCounter(),redRecs);
132     assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS).
133         getCounter(),redRecs);
134     assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).
135         getCounter(),redRecs);
136 
137     //validate skipped records
138     Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
139     Path[] skips = FileUtil.stat2Paths(getFileSystem().listStatus(skipDir));
140     List<String> mapSkipped = new ArrayList<String>();
141     List<String> redSkipped = new ArrayList<String>();
142     for(Path skipPath : skips) {
143       LOG.info("skipPath: " + skipPath);
144 
145       SequenceFile.Reader reader = new SequenceFile.Reader(
146           getFileSystem(), skipPath, conf);
147       Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
148       Object value = ReflectionUtils.newInstance(reader.getValueClass(),
149           conf);
150       key = reader.next(key);
151       while(key!=null) {
152         value = reader.getCurrentValue(value);
153         LOG.debug("key:"+key+" value:"+value.toString());
154         if(skipPath.getName().contains("_r_")) {
155           redSkipped.add(value.toString());
156         } else {
157           mapSkipped.add(value.toString());
158         }
159         key = reader.next(key);
160       }
161       reader.close();
162     }
163     assertTrue(mapSkipped.containsAll(mapperBadRecords));
164     assertTrue(redSkipped.containsAll(redBadRecords));
165 
166     Path[] outputFiles = FileUtil.stat2Paths(
167         getFileSystem().listStatus(getOutputDir(),
168         new Utils.OutputFileUtils.OutputFilesFilter()));
169 
170     List<String> mapperOutput=getProcessed(input, mapperBadRecords);
171     LOG.debug("mapperOutput " + mapperOutput.size());
172     List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords);
173     LOG.debug("reducerOutput " + reducerOutput.size());
174 
175    if (outputFiles.length > 0) {
176       InputStream is = getFileSystem().open(outputFiles[0]);
177       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
178       String line = reader.readLine();
179       int counter = 0;
180       while (line != null) {
181         counter++;
182         StringTokenizer tokeniz = new StringTokenizer(line, "\t");
183         String key = tokeniz.nextToken();
184         String value = tokeniz.nextToken();
185         LOG.debug("Output: key:"+key + "  value:"+value);
186         assertTrue(value.contains("hello"));
187 
188 
189         assertTrue(reducerOutput.contains(value));
190         line = reader.readLine();
191       }
192       reader.close();
193       assertEquals(reducerOutput.size(), counter);
194     }
195   }
196 
getProcessed(List<String> inputs, List<String> badRecs)197   private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
198     List<String> processed = new ArrayList<String>();
199     for(String input : inputs) {
200       if(!badRecs.contains(input)) {
201         processed.add(input);
202       }
203     }
204     return processed;
205   }
206 
testBadMapRed()207   public void testBadMapRed() throws Exception {
208     JobConf conf = createJobConf();
209     conf.setMapperClass(BadMapper.class);
210     conf.setReducerClass(BadReducer.class);
211     runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
212   }
213 
214 
215   static class BadMapper extends MapReduceBase implements
216     Mapper<LongWritable, Text, LongWritable, Text> {
217 
map(LongWritable key, Text val, OutputCollector<LongWritable, Text> output, Reporter reporter)218     public void map(LongWritable key, Text val,
219         OutputCollector<LongWritable, Text> output, Reporter reporter)
220         throws IOException {
221       String str = val.toString();
222       LOG.debug("MAP key:" +key +"  value:" + str);
223       if(MAPPER_BAD_RECORDS.get(0).equals(str)) {
224         LOG.warn("MAP Encountered BAD record");
225         System.exit(-1);
226       }
227       else if(MAPPER_BAD_RECORDS.get(1).equals(str)) {
228         LOG.warn("MAP Encountered BAD record");
229         throw new RuntimeException("Bad record "+str);
230       }
231       else if(MAPPER_BAD_RECORDS.get(2).equals(str)) {
232         try {
233           LOG.warn("MAP Encountered BAD record");
234           Thread.sleep(15*60*1000);
235         } catch (InterruptedException e) {
236           e.printStackTrace();
237         }
238       }
239       output.collect(key, val);
240     }
241   }
242 
243   static class BadReducer extends MapReduceBase implements
244     Reducer<LongWritable, Text, LongWritable, Text> {
245 
reduce(LongWritable key, Iterator<Text> values, OutputCollector<LongWritable, Text> output, Reporter reporter)246     public void reduce(LongWritable key, Iterator<Text> values,
247         OutputCollector<LongWritable, Text> output, Reporter reporter)
248         throws IOException {
249       while(values.hasNext()) {
250         Text value = values.next();
251         LOG.debug("REDUCE key:" +key +"  value:" + value);
252         if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) {
253           LOG.warn("REDUCE Encountered BAD record");
254           System.exit(-1);
255         }
256         else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) {
257           try {
258             LOG.warn("REDUCE Encountered BAD record");
259             Thread.sleep(15*60*1000);
260           } catch (InterruptedException e) {
261             e.printStackTrace();
262           }
263         }
264         output.collect(key, value);
265       }
266 
267     }
268   }
269 
270 
271 }
272