1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.*;
22 import java.net.URI;
23 import java.util.*;
24 
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.io.BytesWritable;
28 import org.apache.hadoop.io.IntWritable;
29 import org.apache.hadoop.io.SequenceFile;
30 import org.apache.hadoop.io.Text;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.io.WritableComparable;
33 import org.apache.hadoop.io.WritableComparator;
34 import org.apache.hadoop.io.WritableUtils;
35 import org.apache.hadoop.mapred.lib.HashPartitioner;
36 import org.apache.hadoop.util.Tool;
37 import org.apache.hadoop.util.ToolRunner;
38 import org.apache.hadoop.fs.*;
39 
40 /**
41  * A set of utilities to validate the <b>sort</b> of the map-reduce framework.
42  * This utility program has 2 main parts:
43  * 1. Checking the records' statistics
44  *   a) Validates the no. of bytes and records in sort's input & output.
45  *   b) Validates the xor of the md5's of each key/value pair.
46  *   c) Ensures same key/value is present in both input and output.
47  * 2. Check individual records  to ensure each record is present in both
48  *    the input and the output of the sort (expensive on large data-sets).
49  *
50  * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
51  *            [-m <i>maps</i>] [-r <i>reduces</i>] [-deep]
52  *            -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i>
53  */
54 public class SortValidator extends Configured implements Tool {
55 
56   static private final IntWritable sortInput = new IntWritable(1);
57   static private final IntWritable sortOutput = new IntWritable(2);
58 
printUsage()59   static void printUsage() {
60     System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
61                        "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
62     System.exit(1);
63   }
64 
deduceInputFile(JobConf job)65   static private IntWritable deduceInputFile(JobConf job) {
66     Path[] inputPaths = FileInputFormat.getInputPaths(job);
67     Path inputFile = new Path(job.get("map.input.file"));
68 
69     // value == one for sort-input; value == two for sort-output
70     return (inputFile.getParent().equals(inputPaths[0])) ?
71         sortInput : sortOutput;
72   }
73 
pair(BytesWritable a, BytesWritable b)74   static private byte[] pair(BytesWritable a, BytesWritable b) {
75     byte[] pairData = new byte[a.getLength()+ b.getLength()];
76     System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
77     System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
78     return pairData;
79   }
80 
81   private static final PathFilter sortPathsFilter = new PathFilter() {
82     public boolean accept(Path path) {
83       return (path.getName().startsWith("part-"));
84     }
85   };
86 
87   /**
88    * A simple map-reduce job which checks consistency of the
89    * MapReduce framework's sort by checking:
90    * a) Records are sorted correctly
91    * b) Keys are partitioned correctly
92    * c) The input and output have same no. of bytes and records.
93    * d) The input and output have the correct 'checksum' by xor'ing
94    *    the md5 of each record.
95    *
96    */
97   public static class RecordStatsChecker {
98 
99     /**
100      * Generic way to get <b>raw</b> data from a {@link Writable}.
101      */
102     static class Raw {
103       /**
104        * Get raw data bytes from a {@link Writable}
105        * @param writable {@link Writable} object from whom to get the raw data
106        * @return raw data of the writable
107        */
getRawBytes(Writable writable)108       public byte[] getRawBytes(Writable writable) {
109         return writable.toString().getBytes();
110       }
111 
112       /**
113        * Get number of raw data bytes of the {@link Writable}
114        * @param writable {@link Writable} object from whom to get the raw data
115        *                 length
116        * @return number of raw data bytes
117        */
getRawBytesLength(Writable writable)118       public int getRawBytesLength(Writable writable) {
119         return writable.toString().getBytes().length;
120       }
121     }
122 
123     /**
124      * Specialization of {@link Raw} for {@link BytesWritable}.
125      */
126     static class RawBytesWritable extends Raw  {
getRawBytes(Writable bw)127       public byte[] getRawBytes(Writable bw) {
128         return ((BytesWritable)bw).getBytes();
129       }
getRawBytesLength(Writable bw)130       public int getRawBytesLength(Writable bw) {
131         return ((BytesWritable)bw).getLength();
132       }
133     }
134 
135     /**
136      * Specialization of {@link Raw} for {@link Text}.
137      */
138     static class RawText extends Raw  {
getRawBytes(Writable text)139       public byte[] getRawBytes(Writable text) {
140         return ((Text)text).getBytes();
141       }
getRawBytesLength(Writable text)142       public int getRawBytesLength(Writable text) {
143         return ((Text)text).getLength();
144       }
145     }
146 
createRaw(Class rawClass)147     private static Raw createRaw(Class rawClass) {
148       if (rawClass == Text.class) {
149         return new RawText();
150       } else if (rawClass == BytesWritable.class) {
151         System.err.println("Returning " + RawBytesWritable.class);
152         return new RawBytesWritable();
153       }
154       return new Raw();
155     }
156 
157     public static class RecordStatsWritable implements Writable {
158       private long bytes = 0;
159       private long records = 0;
160       private int checksum = 0;
161 
RecordStatsWritable()162       public RecordStatsWritable() {}
163 
RecordStatsWritable(long bytes, long records, int checksum)164       public RecordStatsWritable(long bytes, long records, int checksum) {
165         this.bytes = bytes;
166         this.records = records;
167         this.checksum = checksum;
168       }
169 
write(DataOutput out)170       public void write(DataOutput out) throws IOException {
171         WritableUtils.writeVLong(out, bytes);
172         WritableUtils.writeVLong(out, records);
173         WritableUtils.writeVInt(out, checksum);
174       }
175 
readFields(DataInput in)176       public void readFields(DataInput in) throws IOException {
177         bytes = WritableUtils.readVLong(in);
178         records = WritableUtils.readVLong(in);
179         checksum = WritableUtils.readVInt(in);
180       }
181 
getBytes()182       public long getBytes() { return bytes; }
getRecords()183       public long getRecords() { return records; }
getChecksum()184       public int getChecksum() { return checksum; }
185     }
186 
187     public static class Map extends MapReduceBase
188       implements Mapper<WritableComparable, Writable,
189                         IntWritable, RecordStatsWritable> {
190 
191       private IntWritable key = null;
192       private WritableComparable prevKey = null;
193       private Class<? extends WritableComparable> keyClass;
194       private Partitioner<WritableComparable, Writable> partitioner = null;
195       private int partition = -1;
196       private int noSortReducers = -1;
197       private long recordId = -1;
198 
199       private Raw rawKey;
200       private Raw rawValue;
201 
configure(JobConf job)202       public void configure(JobConf job) {
203         // 'key' == sortInput for sort-input; key == sortOutput for sort-output
204         key = deduceInputFile(job);
205 
206         if (key == sortOutput) {
207           partitioner = new HashPartitioner<WritableComparable, Writable>();
208 
209           // Figure the 'current' partition and no. of reduces of the 'sort'
210           try {
211             URI inputURI = new URI(job.get("map.input.file"));
212             String inputFile = inputURI.getPath();
213             partition = Integer.valueOf(
214                                         inputFile.substring(inputFile.lastIndexOf("part")+5)
215                                         ).intValue();
216             noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
217           } catch (Exception e) {
218             System.err.println("Caught: " + e);
219             System.exit(-1);
220           }
221         }
222       }
223 
224       @SuppressWarnings("unchecked")
map(WritableComparable key, Writable value, OutputCollector<IntWritable, RecordStatsWritable> output, Reporter reporter)225       public void map(WritableComparable key, Writable value,
226                       OutputCollector<IntWritable, RecordStatsWritable> output,
227                       Reporter reporter) throws IOException {
228         // Set up rawKey and rawValue on the first call to 'map'
229         if (recordId == -1) {
230          rawKey = createRaw(key.getClass());
231          rawValue = createRaw(value.getClass());
232         }
233         ++recordId;
234 
235         if (this.key == sortOutput) {
236           // Check if keys are 'sorted' if this
237           // record is from sort's output
238           if (prevKey == null) {
239             prevKey = key;
240             keyClass = prevKey.getClass();
241           } else {
242             // Sanity check
243             if (keyClass != key.getClass()) {
244               throw new IOException("Type mismatch in key: expected " +
245                                     keyClass.getName() + ", recieved " +
246                                     key.getClass().getName());
247             }
248 
249             // Check if they were sorted correctly
250             if (prevKey.compareTo(key) > 0) {
251               throw new IOException("The 'map-reduce' framework wrongly" +
252                                     " classifed (" + prevKey + ") > (" +
253                                     key + ") "+ "for record# " + recordId);
254             }
255             prevKey = key;
256           }
257 
258           // Check if the sorted output is 'partitioned' right
259           int keyPartition =
260             partitioner.getPartition(key, value, noSortReducers);
261           if (partition != keyPartition) {
262             throw new IOException("Partitions do not match for record# " +
263                                   recordId + " ! - '" + partition + "' v/s '" +
264                                   keyPartition + "'");
265           }
266         }
267 
268         // Construct the record-stats and output (this.key, record-stats)
269         byte[] keyBytes = rawKey.getRawBytes(key);
270         int keyBytesLen = rawKey.getRawBytesLength(key);
271         byte[] valueBytes = rawValue.getRawBytes(value);
272         int valueBytesLen = rawValue.getRawBytesLength(value);
273 
274         int keyValueChecksum =
275           (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
276            WritableComparator.hashBytes(valueBytes, valueBytesLen));
277 
278         output.collect(this.key,
279                        new RecordStatsWritable((keyBytesLen+valueBytesLen),
280                        1, keyValueChecksum)
281                       );
282       }
283 
284     }
285 
286     public static class Reduce extends MapReduceBase
287       implements Reducer<IntWritable, RecordStatsWritable,
288                          IntWritable, RecordStatsWritable> {
289 
reduce(IntWritable key, Iterator<RecordStatsWritable> values, OutputCollector<IntWritable, RecordStatsWritable> output, Reporter reporter)290       public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
291                          OutputCollector<IntWritable,
292                                          RecordStatsWritable> output,
293                          Reporter reporter) throws IOException {
294         long bytes = 0;
295         long records = 0;
296         int xor = 0;
297         while (values.hasNext()) {
298           RecordStatsWritable stats = values.next();
299           bytes += stats.getBytes();
300           records += stats.getRecords();
301           xor ^= stats.getChecksum();
302         }
303 
304         output.collect(key, new RecordStatsWritable(bytes, records, xor));
305       }
306     }
307 
308     public static class NonSplitableSequenceFileInputFormat
309       extends SequenceFileInputFormat {
isSplitable(FileSystem fs, Path filename)310       protected boolean isSplitable(FileSystem fs, Path filename) {
311         return false;
312       }
313     }
314 
checkRecords(Configuration defaults, Path sortInput, Path sortOutput)315     static void checkRecords(Configuration defaults,
316                              Path sortInput, Path sortOutput) throws IOException {
317       FileSystem inputfs = sortInput.getFileSystem(defaults);
318       FileSystem outputfs = sortOutput.getFileSystem(defaults);
319       FileSystem defaultfs = FileSystem.get(defaults);
320       JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
321       jobConf.setJobName("sortvalidate-recordstats-checker");
322 
323       int noSortReduceTasks =
324         outputfs.listStatus(sortOutput, sortPathsFilter).length;
325       jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks);
326       int noSortInputpaths =  inputfs.listStatus(sortInput).length;
327 
328       jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
329       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
330 
331       jobConf.setOutputKeyClass(IntWritable.class);
332       jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
333 
334       jobConf.setMapperClass(Map.class);
335       jobConf.setCombinerClass(Reduce.class);
336       jobConf.setReducerClass(Reduce.class);
337 
338       jobConf.setNumMapTasks(noSortReduceTasks);
339       jobConf.setNumReduceTasks(1);
340 
341       FileInputFormat.setInputPaths(jobConf, sortInput);
342       FileInputFormat.addInputPath(jobConf, sortOutput);
343       Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
344       if (defaultfs.exists(outputPath)) {
345         defaultfs.delete(outputPath, true);
346       }
347       FileOutputFormat.setOutputPath(jobConf, outputPath);
348 
349       // Uncomment to run locally in a single process
350       //job_conf.set("mapred.job.tracker", "local");
351       Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
352       System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
353                          "from " + inputPaths[0] + " (" +
354                          noSortInputpaths + " files), " +
355                          inputPaths[1] + " (" +
356                          noSortReduceTasks +
357                          " files) into " +
358                          FileOutputFormat.getOutputPath(jobConf) +
359                          " with 1 reducer.");
360       Date startTime = new Date();
361       System.out.println("Job started: " + startTime);
362       JobClient.runJob(jobConf);
363       Date end_time = new Date();
364       System.out.println("Job ended: " + end_time);
365       System.out.println("The job took " +
366                          (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
367 
368       // Check to ensure that the statistics of the
369       // framework's sort-input and sort-output match
370       SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
371                                                           new Path(outputPath, "part-00000"), defaults);
372       IntWritable k1 = new IntWritable();
373       IntWritable k2 = new IntWritable();
374       RecordStatsWritable v1 = new RecordStatsWritable();
375       RecordStatsWritable v2 = new RecordStatsWritable();
376       if (!stats.next(k1, v1)) {
377         throw new IOException("Failed to read record #1 from reduce's output");
378       }
379       if (!stats.next(k2, v2)) {
380         throw new IOException("Failed to read record #2 from reduce's output");
381       }
382 
383       if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
384           v1.getChecksum() != v2.getChecksum()) {
385         throw new IOException("(" +
386                               v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
387                               v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
388       }
389     }
390 
391   }
392 
393   /**
394    * A simple map-reduce task to check if the input and the output
395    * of the framework's sort is consistent by ensuring each record
396    * is present in both the input and the output.
397    *
398    */
399   public static class RecordChecker {
400 
401     public static class Map extends MapReduceBase
402       implements Mapper<BytesWritable, BytesWritable,
403                         BytesWritable, IntWritable> {
404 
405       private IntWritable value = null;
406 
configure(JobConf job)407       public void configure(JobConf job) {
408         // value == one for sort-input; value == two for sort-output
409         value = deduceInputFile(job);
410       }
411 
map(BytesWritable key, BytesWritable value, OutputCollector<BytesWritable, IntWritable> output, Reporter reporter)412       public void map(BytesWritable key,
413                       BytesWritable value,
414                       OutputCollector<BytesWritable, IntWritable> output,
415                       Reporter reporter) throws IOException {
416         // newKey = (key, value)
417         BytesWritable keyValue = new BytesWritable(pair(key, value));
418 
419         // output (newKey, value)
420         output.collect(keyValue, this.value);
421       }
422     }
423 
424     public static class Reduce extends MapReduceBase
425       implements Reducer<BytesWritable, IntWritable,
426                         BytesWritable, IntWritable> {
427 
reduce(BytesWritable key, Iterator<IntWritable> values, OutputCollector<BytesWritable, IntWritable> output, Reporter reporter)428       public void reduce(BytesWritable key, Iterator<IntWritable> values,
429                          OutputCollector<BytesWritable, IntWritable> output,
430                          Reporter reporter) throws IOException {
431         int ones = 0;
432         int twos = 0;
433         while (values.hasNext()) {
434           IntWritable count = values.next();
435           if (count.equals(sortInput)) {
436             ++ones;
437           } else if (count.equals(sortOutput)) {
438             ++twos;
439           } else {
440             throw new IOException("Invalid 'value' of " + count.get() +
441                                   " for (key,value): " + key.toString());
442           }
443         }
444 
445         // Check to ensure there are equal no. of ones and twos
446         if (ones != twos) {
447           throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
448                                 ") for (key, value): " + key.toString());
449         }
450       }
451     }
452 
checkRecords(Configuration defaults, int noMaps, int noReduces, Path sortInput, Path sortOutput)453     static void checkRecords(Configuration defaults, int noMaps, int noReduces,
454                              Path sortInput, Path sortOutput) throws IOException {
455       JobConf jobConf = new JobConf(defaults, RecordChecker.class);
456       jobConf.setJobName("sortvalidate-record-checker");
457 
458       jobConf.setInputFormat(SequenceFileInputFormat.class);
459       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
460 
461       jobConf.setOutputKeyClass(BytesWritable.class);
462       jobConf.setOutputValueClass(IntWritable.class);
463 
464       jobConf.setMapperClass(Map.class);
465       jobConf.setReducerClass(Reduce.class);
466 
467       JobClient client = new JobClient(jobConf);
468       ClusterStatus cluster = client.getClusterStatus();
469       if (noMaps == -1) {
470         noMaps = cluster.getTaskTrackers() *
471           jobConf.getInt("test.sortvalidate.maps_per_host", 10);
472       }
473       if (noReduces == -1) {
474         noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
475         String sortReduces = jobConf.get("test.sortvalidate.reduces_per_host");
476         if (sortReduces != null) {
477            noReduces = cluster.getTaskTrackers() *
478                            Integer.parseInt(sortReduces);
479         }
480       }
481       jobConf.setNumMapTasks(noMaps);
482       jobConf.setNumReduceTasks(noReduces);
483 
484       FileInputFormat.setInputPaths(jobConf, sortInput);
485       FileInputFormat.addInputPath(jobConf, sortOutput);
486       Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
487       FileSystem fs = FileSystem.get(defaults);
488       if (fs.exists(outputPath)) {
489         fs.delete(outputPath, true);
490       }
491       FileOutputFormat.setOutputPath(jobConf, outputPath);
492 
493       // Uncomment to run locally in a single process
494       //job_conf.set("mapred.job.tracker", "local");
495       Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
496       System.out.println("\nSortValidator.RecordChecker: Running on " +
497                          cluster.getTaskTrackers() +
498                         " nodes to validate sort from " +
499                          inputPaths[0] + ", " +
500                          inputPaths[1] + " into " +
501                          FileOutputFormat.getOutputPath(jobConf) +
502                          " with " + noReduces + " reduces.");
503       Date startTime = new Date();
504       System.out.println("Job started: " + startTime);
505       JobClient.runJob(jobConf);
506       Date end_time = new Date();
507       System.out.println("Job ended: " + end_time);
508       System.out.println("The job took " +
509                          (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
510     }
511   }
512 
513 
514   /**
515    * The main driver for sort-validator program.
516    * Invoke this method to submit the map/reduce job.
517    * @throws IOException When there is communication problems with the
518    *                     job tracker.
519    */
run(String[] args)520   public int run(String[] args) throws Exception {
521     Configuration defaults = getConf();
522 
523     int noMaps = -1, noReduces = -1;
524     Path sortInput = null, sortOutput = null;
525     boolean deepTest = false;
526     for(int i=0; i < args.length; ++i) {
527       try {
528         if ("-m".equals(args[i])) {
529           noMaps = Integer.parseInt(args[++i]);
530         } else if ("-r".equals(args[i])) {
531           noReduces = Integer.parseInt(args[++i]);
532         } else if ("-sortInput".equals(args[i])){
533           sortInput = new Path(args[++i]);
534         } else if ("-sortOutput".equals(args[i])){
535           sortOutput = new Path(args[++i]);
536         } else if ("-deep".equals(args[i])) {
537           deepTest = true;
538         } else {
539           printUsage();
540           return -1;
541         }
542       } catch (NumberFormatException except) {
543         System.err.println("ERROR: Integer expected instead of " + args[i]);
544         printUsage();
545         return -1;
546       } catch (ArrayIndexOutOfBoundsException except) {
547         System.err.println("ERROR: Required parameter missing from " +
548                            args[i-1]);
549         printUsage();
550         return -1;
551       }
552     }
553 
554     // Sanity check
555     if (sortInput == null || sortOutput == null) {
556       printUsage();
557       return -2;
558     }
559 
560     // Check if the records are consistent and sorted correctly
561     RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
562 
563     // Check if the same records are present in sort's inputs & outputs
564     if (deepTest) {
565       RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput,
566                                  sortOutput);
567     }
568 
569     System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
570                        " successfully.");
571 
572     return 0;
573   }
574 
main(String[] args)575   public static void main(String[] args) throws Exception {
576     int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
577     System.exit(res);
578   }
579 }
580