1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.examples.terasort; 19 20 import java.io.IOException; 21 import java.util.zip.Checksum; 22 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.conf.Configured; 25 import org.apache.hadoop.fs.Path; 26 import org.apache.hadoop.io.NullWritable; 27 import org.apache.hadoop.io.Text; 28 import org.apache.hadoop.mapreduce.Cluster; 29 import org.apache.hadoop.mapreduce.Job; 30 import org.apache.hadoop.mapreduce.Mapper; 31 import org.apache.hadoop.mapreduce.Reducer; 32 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 33 import org.apache.hadoop.util.PureJavaCrc32; 34 import org.apache.hadoop.util.Tool; 35 import org.apache.hadoop.util.ToolRunner; 36 37 public class TeraChecksum extends Configured implements Tool { 38 static class ChecksumMapper 39 extends Mapper<Text, Text, NullWritable, Unsigned16> { 40 private Unsigned16 checksum = new Unsigned16(); 41 private Unsigned16 sum = new Unsigned16(); 42 private Checksum crc32 = new PureJavaCrc32(); 43 map(Text key, Text value, Context context)44 public void map(Text key, Text value, 45 Context context) throws IOException { 46 crc32.reset(); 47 crc32.update(key.getBytes(), 0, key.getLength()); 48 crc32.update(value.getBytes(), 0, value.getLength()); 49 checksum.set(crc32.getValue()); 50 sum.add(checksum); 51 } 52 cleanup(Context context)53 public void cleanup(Context context) 54 throws IOException, InterruptedException { 55 context.write(NullWritable.get(), sum); 56 } 57 } 58 59 static class ChecksumReducer 60 extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> { 61 reduce(NullWritable key, Iterable<Unsigned16> values, Context context)62 public void reduce(NullWritable key, Iterable<Unsigned16> values, 63 Context context) throws IOException, InterruptedException { 64 Unsigned16 sum = new Unsigned16(); 65 for (Unsigned16 val : values) { 66 sum.add(val); 67 } 68 context.write(key, sum); 69 } 70 } 71 usage()72 private static void usage() throws IOException { 73 System.err.println("terasum <out-dir> <report-dir>"); 74 } 75 run(String[] args)76 public int run(String[] args) throws Exception { 77 Job job = Job.getInstance(getConf()); 78 if (args.length != 2) { 79 usage(); 80 return 2; 81 } 82 TeraInputFormat.setInputPaths(job, new Path(args[0])); 83 FileOutputFormat.setOutputPath(job, new Path(args[1])); 84 job.setJobName("TeraSum"); 85 job.setJarByClass(TeraChecksum.class); 86 job.setMapperClass(ChecksumMapper.class); 87 job.setReducerClass(ChecksumReducer.class); 88 job.setOutputKeyClass(NullWritable.class); 89 job.setOutputValueClass(Unsigned16.class); 90 // force a single reducer 91 job.setNumReduceTasks(1); 92 job.setInputFormatClass(TeraInputFormat.class); 93 return job.waitForCompletion(true) ? 0 : 1; 94 } 95 96 /** 97 * @param args 98 */ main(String[] args)99 public static void main(String[] args) throws Exception { 100 int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args); 101 System.exit(res); 102 } 103 104 } 105