1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.*; 22 import java.net.URI; 23 import java.util.*; 24 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.conf.Configured; 27 import org.apache.hadoop.io.BytesWritable; 28 import org.apache.hadoop.io.IntWritable; 29 import org.apache.hadoop.io.SequenceFile; 30 import org.apache.hadoop.io.Text; 31 import org.apache.hadoop.io.Writable; 32 import org.apache.hadoop.io.WritableComparable; 33 import org.apache.hadoop.io.WritableComparator; 34 import org.apache.hadoop.io.WritableUtils; 35 import org.apache.hadoop.mapred.lib.HashPartitioner; 36 import org.apache.hadoop.util.Tool; 37 import org.apache.hadoop.util.ToolRunner; 38 import org.apache.hadoop.fs.*; 39 40 /** 41 * A set of utilities to validate the <b>sort</b> of the map-reduce framework. 42 * This utility program has 2 main parts: 43 * 1. Checking the records' statistics 44 * a) Validates the no. of bytes and records in sort's input & output. 45 * b) Validates the xor of the md5's of each key/value pair. 46 * c) Ensures same key/value is present in both input and output. 47 * 2. Check individual records to ensure each record is present in both 48 * the input and the output of the sort (expensive on large data-sets). 49 * 50 * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate 51 * [-m <i>maps</i>] [-r <i>reduces</i>] [-deep] 52 * -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i> 53 */ 54 public class SortValidator extends Configured implements Tool { 55 56 static private final IntWritable sortInput = new IntWritable(1); 57 static private final IntWritable sortOutput = new IntWritable(2); 58 printUsage()59 static void printUsage() { 60 System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " + 61 "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>"); 62 System.exit(1); 63 } 64 deduceInputFile(JobConf job)65 static private IntWritable deduceInputFile(JobConf job) { 66 Path[] inputPaths = FileInputFormat.getInputPaths(job); 67 Path inputFile = new Path(job.get("map.input.file")); 68 69 // value == one for sort-input; value == two for sort-output 70 return (inputFile.getParent().equals(inputPaths[0])) ? 71 sortInput : sortOutput; 72 } 73 pair(BytesWritable a, BytesWritable b)74 static private byte[] pair(BytesWritable a, BytesWritable b) { 75 byte[] pairData = new byte[a.getLength()+ b.getLength()]; 76 System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength()); 77 System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength()); 78 return pairData; 79 } 80 81 private static final PathFilter sortPathsFilter = new PathFilter() { 82 public boolean accept(Path path) { 83 return (path.getName().startsWith("part-")); 84 } 85 }; 86 87 /** 88 * A simple map-reduce job which checks consistency of the 89 * MapReduce framework's sort by checking: 90 * a) Records are sorted correctly 91 * b) Keys are partitioned correctly 92 * c) The input and output have same no. of bytes and records. 93 * d) The input and output have the correct 'checksum' by xor'ing 94 * the md5 of each record. 95 * 96 */ 97 public static class RecordStatsChecker { 98 99 /** 100 * Generic way to get <b>raw</b> data from a {@link Writable}. 101 */ 102 static class Raw { 103 /** 104 * Get raw data bytes from a {@link Writable} 105 * @param writable {@link Writable} object from whom to get the raw data 106 * @return raw data of the writable 107 */ getRawBytes(Writable writable)108 public byte[] getRawBytes(Writable writable) { 109 return writable.toString().getBytes(); 110 } 111 112 /** 113 * Get number of raw data bytes of the {@link Writable} 114 * @param writable {@link Writable} object from whom to get the raw data 115 * length 116 * @return number of raw data bytes 117 */ getRawBytesLength(Writable writable)118 public int getRawBytesLength(Writable writable) { 119 return writable.toString().getBytes().length; 120 } 121 } 122 123 /** 124 * Specialization of {@link Raw} for {@link BytesWritable}. 125 */ 126 static class RawBytesWritable extends Raw { getRawBytes(Writable bw)127 public byte[] getRawBytes(Writable bw) { 128 return ((BytesWritable)bw).getBytes(); 129 } getRawBytesLength(Writable bw)130 public int getRawBytesLength(Writable bw) { 131 return ((BytesWritable)bw).getLength(); 132 } 133 } 134 135 /** 136 * Specialization of {@link Raw} for {@link Text}. 137 */ 138 static class RawText extends Raw { getRawBytes(Writable text)139 public byte[] getRawBytes(Writable text) { 140 return ((Text)text).getBytes(); 141 } getRawBytesLength(Writable text)142 public int getRawBytesLength(Writable text) { 143 return ((Text)text).getLength(); 144 } 145 } 146 createRaw(Class rawClass)147 private static Raw createRaw(Class rawClass) { 148 if (rawClass == Text.class) { 149 return new RawText(); 150 } else if (rawClass == BytesWritable.class) { 151 System.err.println("Returning " + RawBytesWritable.class); 152 return new RawBytesWritable(); 153 } 154 return new Raw(); 155 } 156 157 public static class RecordStatsWritable implements Writable { 158 private long bytes = 0; 159 private long records = 0; 160 private int checksum = 0; 161 RecordStatsWritable()162 public RecordStatsWritable() {} 163 RecordStatsWritable(long bytes, long records, int checksum)164 public RecordStatsWritable(long bytes, long records, int checksum) { 165 this.bytes = bytes; 166 this.records = records; 167 this.checksum = checksum; 168 } 169 write(DataOutput out)170 public void write(DataOutput out) throws IOException { 171 WritableUtils.writeVLong(out, bytes); 172 WritableUtils.writeVLong(out, records); 173 WritableUtils.writeVInt(out, checksum); 174 } 175 readFields(DataInput in)176 public void readFields(DataInput in) throws IOException { 177 bytes = WritableUtils.readVLong(in); 178 records = WritableUtils.readVLong(in); 179 checksum = WritableUtils.readVInt(in); 180 } 181 getBytes()182 public long getBytes() { return bytes; } getRecords()183 public long getRecords() { return records; } getChecksum()184 public int getChecksum() { return checksum; } 185 } 186 187 public static class Map extends MapReduceBase 188 implements Mapper<WritableComparable, Writable, 189 IntWritable, RecordStatsWritable> { 190 191 private IntWritable key = null; 192 private WritableComparable prevKey = null; 193 private Class<? extends WritableComparable> keyClass; 194 private Partitioner<WritableComparable, Writable> partitioner = null; 195 private int partition = -1; 196 private int noSortReducers = -1; 197 private long recordId = -1; 198 199 private Raw rawKey; 200 private Raw rawValue; 201 configure(JobConf job)202 public void configure(JobConf job) { 203 // 'key' == sortInput for sort-input; key == sortOutput for sort-output 204 key = deduceInputFile(job); 205 206 if (key == sortOutput) { 207 partitioner = new HashPartitioner<WritableComparable, Writable>(); 208 209 // Figure the 'current' partition and no. of reduces of the 'sort' 210 try { 211 URI inputURI = new URI(job.get("map.input.file")); 212 String inputFile = inputURI.getPath(); 213 partition = Integer.valueOf( 214 inputFile.substring(inputFile.lastIndexOf("part")+5) 215 ).intValue(); 216 noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1); 217 } catch (Exception e) { 218 System.err.println("Caught: " + e); 219 System.exit(-1); 220 } 221 } 222 } 223 224 @SuppressWarnings("unchecked") map(WritableComparable key, Writable value, OutputCollector<IntWritable, RecordStatsWritable> output, Reporter reporter)225 public void map(WritableComparable key, Writable value, 226 OutputCollector<IntWritable, RecordStatsWritable> output, 227 Reporter reporter) throws IOException { 228 // Set up rawKey and rawValue on the first call to 'map' 229 if (recordId == -1) { 230 rawKey = createRaw(key.getClass()); 231 rawValue = createRaw(value.getClass()); 232 } 233 ++recordId; 234 235 if (this.key == sortOutput) { 236 // Check if keys are 'sorted' if this 237 // record is from sort's output 238 if (prevKey == null) { 239 prevKey = key; 240 keyClass = prevKey.getClass(); 241 } else { 242 // Sanity check 243 if (keyClass != key.getClass()) { 244 throw new IOException("Type mismatch in key: expected " + 245 keyClass.getName() + ", recieved " + 246 key.getClass().getName()); 247 } 248 249 // Check if they were sorted correctly 250 if (prevKey.compareTo(key) > 0) { 251 throw new IOException("The 'map-reduce' framework wrongly" + 252 " classifed (" + prevKey + ") > (" + 253 key + ") "+ "for record# " + recordId); 254 } 255 prevKey = key; 256 } 257 258 // Check if the sorted output is 'partitioned' right 259 int keyPartition = 260 partitioner.getPartition(key, value, noSortReducers); 261 if (partition != keyPartition) { 262 throw new IOException("Partitions do not match for record# " + 263 recordId + " ! - '" + partition + "' v/s '" + 264 keyPartition + "'"); 265 } 266 } 267 268 // Construct the record-stats and output (this.key, record-stats) 269 byte[] keyBytes = rawKey.getRawBytes(key); 270 int keyBytesLen = rawKey.getRawBytesLength(key); 271 byte[] valueBytes = rawValue.getRawBytes(value); 272 int valueBytesLen = rawValue.getRawBytesLength(value); 273 274 int keyValueChecksum = 275 (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^ 276 WritableComparator.hashBytes(valueBytes, valueBytesLen)); 277 278 output.collect(this.key, 279 new RecordStatsWritable((keyBytesLen+valueBytesLen), 280 1, keyValueChecksum) 281 ); 282 } 283 284 } 285 286 public static class Reduce extends MapReduceBase 287 implements Reducer<IntWritable, RecordStatsWritable, 288 IntWritable, RecordStatsWritable> { 289 reduce(IntWritable key, Iterator<RecordStatsWritable> values, OutputCollector<IntWritable, RecordStatsWritable> output, Reporter reporter)290 public void reduce(IntWritable key, Iterator<RecordStatsWritable> values, 291 OutputCollector<IntWritable, 292 RecordStatsWritable> output, 293 Reporter reporter) throws IOException { 294 long bytes = 0; 295 long records = 0; 296 int xor = 0; 297 while (values.hasNext()) { 298 RecordStatsWritable stats = values.next(); 299 bytes += stats.getBytes(); 300 records += stats.getRecords(); 301 xor ^= stats.getChecksum(); 302 } 303 304 output.collect(key, new RecordStatsWritable(bytes, records, xor)); 305 } 306 } 307 308 public static class NonSplitableSequenceFileInputFormat 309 extends SequenceFileInputFormat { isSplitable(FileSystem fs, Path filename)310 protected boolean isSplitable(FileSystem fs, Path filename) { 311 return false; 312 } 313 } 314 checkRecords(Configuration defaults, Path sortInput, Path sortOutput)315 static void checkRecords(Configuration defaults, 316 Path sortInput, Path sortOutput) throws IOException { 317 FileSystem inputfs = sortInput.getFileSystem(defaults); 318 FileSystem outputfs = sortOutput.getFileSystem(defaults); 319 FileSystem defaultfs = FileSystem.get(defaults); 320 JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class); 321 jobConf.setJobName("sortvalidate-recordstats-checker"); 322 323 int noSortReduceTasks = 324 outputfs.listStatus(sortOutput, sortPathsFilter).length; 325 jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks); 326 int noSortInputpaths = inputfs.listStatus(sortInput).length; 327 328 jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); 329 jobConf.setOutputFormat(SequenceFileOutputFormat.class); 330 331 jobConf.setOutputKeyClass(IntWritable.class); 332 jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class); 333 334 jobConf.setMapperClass(Map.class); 335 jobConf.setCombinerClass(Reduce.class); 336 jobConf.setReducerClass(Reduce.class); 337 338 jobConf.setNumMapTasks(noSortReduceTasks); 339 jobConf.setNumReduceTasks(1); 340 341 FileInputFormat.setInputPaths(jobConf, sortInput); 342 FileInputFormat.addInputPath(jobConf, sortOutput); 343 Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker"); 344 if (defaultfs.exists(outputPath)) { 345 defaultfs.delete(outputPath, true); 346 } 347 FileOutputFormat.setOutputPath(jobConf, outputPath); 348 349 // Uncomment to run locally in a single process 350 //job_conf.set("mapred.job.tracker", "local"); 351 Path[] inputPaths = FileInputFormat.getInputPaths(jobConf); 352 System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " + 353 "from " + inputPaths[0] + " (" + 354 noSortInputpaths + " files), " + 355 inputPaths[1] + " (" + 356 noSortReduceTasks + 357 " files) into " + 358 FileOutputFormat.getOutputPath(jobConf) + 359 " with 1 reducer."); 360 Date startTime = new Date(); 361 System.out.println("Job started: " + startTime); 362 JobClient.runJob(jobConf); 363 Date end_time = new Date(); 364 System.out.println("Job ended: " + end_time); 365 System.out.println("The job took " + 366 (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); 367 368 // Check to ensure that the statistics of the 369 // framework's sort-input and sort-output match 370 SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs, 371 new Path(outputPath, "part-00000"), defaults); 372 IntWritable k1 = new IntWritable(); 373 IntWritable k2 = new IntWritable(); 374 RecordStatsWritable v1 = new RecordStatsWritable(); 375 RecordStatsWritable v2 = new RecordStatsWritable(); 376 if (!stats.next(k1, v1)) { 377 throw new IOException("Failed to read record #1 from reduce's output"); 378 } 379 if (!stats.next(k2, v2)) { 380 throw new IOException("Failed to read record #2 from reduce's output"); 381 } 382 383 if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || 384 v1.getChecksum() != v2.getChecksum()) { 385 throw new IOException("(" + 386 v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" + 387 v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")"); 388 } 389 } 390 391 } 392 393 /** 394 * A simple map-reduce task to check if the input and the output 395 * of the framework's sort is consistent by ensuring each record 396 * is present in both the input and the output. 397 * 398 */ 399 public static class RecordChecker { 400 401 public static class Map extends MapReduceBase 402 implements Mapper<BytesWritable, BytesWritable, 403 BytesWritable, IntWritable> { 404 405 private IntWritable value = null; 406 configure(JobConf job)407 public void configure(JobConf job) { 408 // value == one for sort-input; value == two for sort-output 409 value = deduceInputFile(job); 410 } 411 map(BytesWritable key, BytesWritable value, OutputCollector<BytesWritable, IntWritable> output, Reporter reporter)412 public void map(BytesWritable key, 413 BytesWritable value, 414 OutputCollector<BytesWritable, IntWritable> output, 415 Reporter reporter) throws IOException { 416 // newKey = (key, value) 417 BytesWritable keyValue = new BytesWritable(pair(key, value)); 418 419 // output (newKey, value) 420 output.collect(keyValue, this.value); 421 } 422 } 423 424 public static class Reduce extends MapReduceBase 425 implements Reducer<BytesWritable, IntWritable, 426 BytesWritable, IntWritable> { 427 reduce(BytesWritable key, Iterator<IntWritable> values, OutputCollector<BytesWritable, IntWritable> output, Reporter reporter)428 public void reduce(BytesWritable key, Iterator<IntWritable> values, 429 OutputCollector<BytesWritable, IntWritable> output, 430 Reporter reporter) throws IOException { 431 int ones = 0; 432 int twos = 0; 433 while (values.hasNext()) { 434 IntWritable count = values.next(); 435 if (count.equals(sortInput)) { 436 ++ones; 437 } else if (count.equals(sortOutput)) { 438 ++twos; 439 } else { 440 throw new IOException("Invalid 'value' of " + count.get() + 441 " for (key,value): " + key.toString()); 442 } 443 } 444 445 // Check to ensure there are equal no. of ones and twos 446 if (ones != twos) { 447 throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos + 448 ") for (key, value): " + key.toString()); 449 } 450 } 451 } 452 checkRecords(Configuration defaults, int noMaps, int noReduces, Path sortInput, Path sortOutput)453 static void checkRecords(Configuration defaults, int noMaps, int noReduces, 454 Path sortInput, Path sortOutput) throws IOException { 455 JobConf jobConf = new JobConf(defaults, RecordChecker.class); 456 jobConf.setJobName("sortvalidate-record-checker"); 457 458 jobConf.setInputFormat(SequenceFileInputFormat.class); 459 jobConf.setOutputFormat(SequenceFileOutputFormat.class); 460 461 jobConf.setOutputKeyClass(BytesWritable.class); 462 jobConf.setOutputValueClass(IntWritable.class); 463 464 jobConf.setMapperClass(Map.class); 465 jobConf.setReducerClass(Reduce.class); 466 467 JobClient client = new JobClient(jobConf); 468 ClusterStatus cluster = client.getClusterStatus(); 469 if (noMaps == -1) { 470 noMaps = cluster.getTaskTrackers() * 471 jobConf.getInt("test.sortvalidate.maps_per_host", 10); 472 } 473 if (noReduces == -1) { 474 noReduces = (int) (cluster.getMaxReduceTasks() * 0.9); 475 String sortReduces = jobConf.get("test.sortvalidate.reduces_per_host"); 476 if (sortReduces != null) { 477 noReduces = cluster.getTaskTrackers() * 478 Integer.parseInt(sortReduces); 479 } 480 } 481 jobConf.setNumMapTasks(noMaps); 482 jobConf.setNumReduceTasks(noReduces); 483 484 FileInputFormat.setInputPaths(jobConf, sortInput); 485 FileInputFormat.addInputPath(jobConf, sortOutput); 486 Path outputPath = new Path("/tmp/sortvalidate/recordchecker"); 487 FileSystem fs = FileSystem.get(defaults); 488 if (fs.exists(outputPath)) { 489 fs.delete(outputPath, true); 490 } 491 FileOutputFormat.setOutputPath(jobConf, outputPath); 492 493 // Uncomment to run locally in a single process 494 //job_conf.set("mapred.job.tracker", "local"); 495 Path[] inputPaths = FileInputFormat.getInputPaths(jobConf); 496 System.out.println("\nSortValidator.RecordChecker: Running on " + 497 cluster.getTaskTrackers() + 498 " nodes to validate sort from " + 499 inputPaths[0] + ", " + 500 inputPaths[1] + " into " + 501 FileOutputFormat.getOutputPath(jobConf) + 502 " with " + noReduces + " reduces."); 503 Date startTime = new Date(); 504 System.out.println("Job started: " + startTime); 505 JobClient.runJob(jobConf); 506 Date end_time = new Date(); 507 System.out.println("Job ended: " + end_time); 508 System.out.println("The job took " + 509 (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); 510 } 511 } 512 513 514 /** 515 * The main driver for sort-validator program. 516 * Invoke this method to submit the map/reduce job. 517 * @throws IOException When there is communication problems with the 518 * job tracker. 519 */ run(String[] args)520 public int run(String[] args) throws Exception { 521 Configuration defaults = getConf(); 522 523 int noMaps = -1, noReduces = -1; 524 Path sortInput = null, sortOutput = null; 525 boolean deepTest = false; 526 for(int i=0; i < args.length; ++i) { 527 try { 528 if ("-m".equals(args[i])) { 529 noMaps = Integer.parseInt(args[++i]); 530 } else if ("-r".equals(args[i])) { 531 noReduces = Integer.parseInt(args[++i]); 532 } else if ("-sortInput".equals(args[i])){ 533 sortInput = new Path(args[++i]); 534 } else if ("-sortOutput".equals(args[i])){ 535 sortOutput = new Path(args[++i]); 536 } else if ("-deep".equals(args[i])) { 537 deepTest = true; 538 } else { 539 printUsage(); 540 return -1; 541 } 542 } catch (NumberFormatException except) { 543 System.err.println("ERROR: Integer expected instead of " + args[i]); 544 printUsage(); 545 return -1; 546 } catch (ArrayIndexOutOfBoundsException except) { 547 System.err.println("ERROR: Required parameter missing from " + 548 args[i-1]); 549 printUsage(); 550 return -1; 551 } 552 } 553 554 // Sanity check 555 if (sortInput == null || sortOutput == null) { 556 printUsage(); 557 return -2; 558 } 559 560 // Check if the records are consistent and sorted correctly 561 RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput); 562 563 // Check if the same records are present in sort's inputs & outputs 564 if (deepTest) { 565 RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput, 566 sortOutput); 567 } 568 569 System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" + 570 " successfully."); 571 572 return 0; 573 } 574 main(String[] args)575 public static void main(String[] args) throws Exception { 576 int res = ToolRunner.run(new Configuration(), new SortValidator(), args); 577 System.exit(res); 578 } 579 } 580