1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.examples.terasort; 20 21 import java.io.IOException; 22 import java.util.zip.Checksum; 23 24 import org.apache.hadoop.conf.Configuration; 25 import org.apache.hadoop.conf.Configured; 26 import org.apache.hadoop.fs.Path; 27 import org.apache.hadoop.io.BytesWritable; 28 import org.apache.hadoop.io.Text; 29 import org.apache.hadoop.mapreduce.Cluster; 30 import org.apache.hadoop.mapreduce.Job; 31 import org.apache.hadoop.mapreduce.Mapper; 32 import org.apache.hadoop.mapreduce.Reducer; 33 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 34 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 35 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 36 import org.apache.hadoop.util.PureJavaCrc32; 37 import org.apache.hadoop.util.Tool; 38 import org.apache.hadoop.util.ToolRunner; 39 40 /** 41 * Generate 1 mapper per a file that checks to make sure the keys 42 * are sorted within each file. The mapper also generates 43 * "$file:begin", first key and "$file:end", last key. The reduce verifies that 44 * all of the start/end items are in order. 45 * Any output from the reduce is problem report. 46 * <p> 47 * To run the program: 48 * <b>bin/hadoop jar hadoop-*-examples.jar teravalidate out-dir report-dir</b> 49 * <p> 50 * If there is any output, something is wrong and the output of the reduce 51 * will have the problem report. 52 */ 53 public class TeraValidate extends Configured implements Tool { 54 private static final Text ERROR = new Text("error"); 55 private static final Text CHECKSUM = new Text("checksum"); 56 textifyBytes(Text t)57 private static String textifyBytes(Text t) { 58 BytesWritable b = new BytesWritable(); 59 b.set(t.getBytes(), 0, t.getLength()); 60 return b.toString(); 61 } 62 63 static class ValidateMapper extends Mapper<Text,Text,Text,Text> { 64 private Text lastKey; 65 private String filename; 66 private Unsigned16 checksum = new Unsigned16(); 67 private Unsigned16 tmp = new Unsigned16(); 68 private Checksum crc32 = new PureJavaCrc32(); 69 70 /** 71 * Get the final part of the input name 72 * @param split the input split 73 * @return the "part-r-00000" for the input 74 */ getFilename(FileSplit split)75 private String getFilename(FileSplit split) { 76 return split.getPath().getName(); 77 } 78 map(Text key, Text value, Context context)79 public void map(Text key, Text value, Context context) 80 throws IOException, InterruptedException { 81 if (lastKey == null) { 82 FileSplit fs = (FileSplit) context.getInputSplit(); 83 filename = getFilename(fs); 84 context.write(new Text(filename + ":begin"), key); 85 lastKey = new Text(); 86 } else { 87 if (key.compareTo(lastKey) < 0) { 88 context.write(ERROR, new Text("misorder in " + filename + 89 " between " + textifyBytes(lastKey) + 90 " and " + textifyBytes(key))); 91 } 92 } 93 // compute the crc of the key and value and add it to the sum 94 crc32.reset(); 95 crc32.update(key.getBytes(), 0, key.getLength()); 96 crc32.update(value.getBytes(), 0, value.getLength()); 97 tmp.set(crc32.getValue()); 98 checksum.add(tmp); 99 lastKey.set(key); 100 } 101 cleanup(Context context)102 public void cleanup(Context context) 103 throws IOException, InterruptedException { 104 if (lastKey != null) { 105 context.write(new Text(filename + ":end"), lastKey); 106 context.write(CHECKSUM, new Text(checksum.toString())); 107 } 108 } 109 } 110 111 /** 112 * Check the boundaries between the output files by making sure that the 113 * boundary keys are always increasing. 114 * Also passes any error reports along intact. 115 */ 116 static class ValidateReducer extends Reducer<Text,Text,Text,Text> { 117 private boolean firstKey = true; 118 private Text lastKey = new Text(); 119 private Text lastValue = new Text(); reduce(Text key, Iterable<Text> values, Context context)120 public void reduce(Text key, Iterable<Text> values, 121 Context context) throws IOException, InterruptedException { 122 if (ERROR.equals(key)) { 123 for (Text val : values) { 124 context.write(key, val); 125 } 126 } else if (CHECKSUM.equals(key)) { 127 Unsigned16 tmp = new Unsigned16(); 128 Unsigned16 sum = new Unsigned16(); 129 for (Text val : values) { 130 tmp.set(val.toString()); 131 sum.add(tmp); 132 } 133 context.write(CHECKSUM, new Text(sum.toString())); 134 } else { 135 Text value = values.iterator().next(); 136 if (firstKey) { 137 firstKey = false; 138 } else { 139 if (value.compareTo(lastValue) < 0) { 140 context.write(ERROR, 141 new Text("bad key partitioning:\n file " + 142 lastKey + " key " + 143 textifyBytes(lastValue) + 144 "\n file " + key + " key " + 145 textifyBytes(value))); 146 } 147 } 148 lastKey.set(key); 149 lastValue.set(value); 150 } 151 } 152 153 } 154 usage()155 private static void usage() throws IOException { 156 System.err.println("teravalidate <out-dir> <report-dir>"); 157 } 158 run(String[] args)159 public int run(String[] args) throws Exception { 160 Job job = Job.getInstance(getConf()); 161 if (args.length != 2) { 162 usage(); 163 return 1; 164 } 165 TeraInputFormat.setInputPaths(job, new Path(args[0])); 166 FileOutputFormat.setOutputPath(job, new Path(args[1])); 167 job.setJobName("TeraValidate"); 168 job.setJarByClass(TeraValidate.class); 169 job.setMapperClass(ValidateMapper.class); 170 job.setReducerClass(ValidateReducer.class); 171 job.setOutputKeyClass(Text.class); 172 job.setOutputValueClass(Text.class); 173 // force a single reducer 174 job.setNumReduceTasks(1); 175 // force a single split 176 FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE); 177 job.setInputFormatClass(TeraInputFormat.class); 178 return job.waitForCompletion(true) ? 0 : 1; 179 } 180 181 /** 182 * @param args 183 */ main(String[] args)184 public static void main(String[] args) throws Exception { 185 int res = ToolRunner.run(new Configuration(), new TeraValidate(), args); 186 System.exit(res); 187 } 188 189 } 190