1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.examples.terasort;
19 
20 import java.io.IOException;
21 import java.util.zip.Checksum;
22 
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.io.NullWritable;
27 import org.apache.hadoop.io.Text;
28 import org.apache.hadoop.mapreduce.Cluster;
29 import org.apache.hadoop.mapreduce.Job;
30 import org.apache.hadoop.mapreduce.Mapper;
31 import org.apache.hadoop.mapreduce.Reducer;
32 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
33 import org.apache.hadoop.util.PureJavaCrc32;
34 import org.apache.hadoop.util.Tool;
35 import org.apache.hadoop.util.ToolRunner;
36 
37 public class TeraChecksum extends Configured implements Tool {
38   static class ChecksumMapper
39       extends Mapper<Text, Text, NullWritable, Unsigned16> {
40     private Unsigned16 checksum = new Unsigned16();
41     private Unsigned16 sum = new Unsigned16();
42     private Checksum crc32 = new PureJavaCrc32();
43 
map(Text key, Text value, Context context)44     public void map(Text key, Text value,
45                     Context context) throws IOException {
46       crc32.reset();
47       crc32.update(key.getBytes(), 0, key.getLength());
48       crc32.update(value.getBytes(), 0, value.getLength());
49       checksum.set(crc32.getValue());
50       sum.add(checksum);
51     }
52 
cleanup(Context context)53     public void cleanup(Context context)
54         throws IOException, InterruptedException {
55       context.write(NullWritable.get(), sum);
56     }
57   }
58 
59   static class ChecksumReducer
60       extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> {
61 
reduce(NullWritable key, Iterable<Unsigned16> values, Context context)62     public void reduce(NullWritable key, Iterable<Unsigned16> values,
63         Context context) throws IOException, InterruptedException  {
64       Unsigned16 sum = new Unsigned16();
65       for (Unsigned16 val : values) {
66         sum.add(val);
67       }
68       context.write(key, sum);
69     }
70   }
71 
usage()72   private static void usage() throws IOException {
73     System.err.println("terasum <out-dir> <report-dir>");
74   }
75 
run(String[] args)76   public int run(String[] args) throws Exception {
77     Job job = Job.getInstance(getConf());
78     if (args.length != 2) {
79       usage();
80       return 2;
81     }
82     TeraInputFormat.setInputPaths(job, new Path(args[0]));
83     FileOutputFormat.setOutputPath(job, new Path(args[1]));
84     job.setJobName("TeraSum");
85     job.setJarByClass(TeraChecksum.class);
86     job.setMapperClass(ChecksumMapper.class);
87     job.setReducerClass(ChecksumReducer.class);
88     job.setOutputKeyClass(NullWritable.class);
89     job.setOutputValueClass(Unsigned16.class);
90     // force a single reducer
91     job.setNumReduceTasks(1);
92     job.setInputFormatClass(TeraInputFormat.class);
93     return job.waitForCompletion(true) ? 0 : 1;
94   }
95 
96   /**
97    * @param args
98    */
main(String[] args)99   public static void main(String[] args) throws Exception {
100     int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args);
101     System.exit(res);
102   }
103 
104 }
105