1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.examples.terasort; 20 21 import java.io.DataInput; 22 import java.io.DataOutput; 23 import java.io.IOException; 24 import java.util.ArrayList; 25 import java.util.List; 26 import java.util.zip.Checksum; 27 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.conf.Configured; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.io.LongWritable; 34 import org.apache.hadoop.io.NullWritable; 35 import org.apache.hadoop.io.Text; 36 import org.apache.hadoop.io.Writable; 37 import org.apache.hadoop.io.WritableUtils; 38 import org.apache.hadoop.mapreduce.Cluster; 39 import org.apache.hadoop.mapreduce.Counter; 40 import org.apache.hadoop.mapreduce.InputFormat; 41 import org.apache.hadoop.mapreduce.InputSplit; 42 import org.apache.hadoop.mapreduce.Job; 43 import org.apache.hadoop.mapreduce.JobContext; 44 import org.apache.hadoop.mapreduce.MRJobConfig; 45 import org.apache.hadoop.mapreduce.Mapper; 46 import org.apache.hadoop.mapreduce.RecordReader; 47 import org.apache.hadoop.mapreduce.TaskAttemptContext; 48 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 49 import org.apache.hadoop.util.PureJavaCrc32; 50 import org.apache.hadoop.util.Tool; 51 import org.apache.hadoop.util.ToolRunner; 52 53 /** 54 * Generate the official GraySort input data set. 55 * The user specifies the number of rows and the output directory and this 56 * class runs a map/reduce program to generate the data. 57 * The format of the data is: 58 * <ul> 59 * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid) 60 * (constant 4 bytes) (48 bytes filler) (constant 4 bytes) 61 * <li>The rowid is the right justified row id as a hex number. 62 * </ul> 63 * 64 * <p> 65 * To run the program: 66 * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b> 67 */ 68 public class TeraGen extends Configured implements Tool { 69 private static final Log LOG = LogFactory.getLog(TeraSort.class); 70 71 public static enum Counters {CHECKSUM} 72 73 public static final String NUM_ROWS = "mapreduce.terasort.num-rows"; 74 /** 75 * An input format that assigns ranges of longs to each mapper. 76 */ 77 static class RangeInputFormat 78 extends InputFormat<LongWritable, NullWritable> { 79 80 /** 81 * An input split consisting of a range on numbers. 82 */ 83 static class RangeInputSplit extends InputSplit implements Writable { 84 long firstRow; 85 long rowCount; 86 RangeInputSplit()87 public RangeInputSplit() { } 88 RangeInputSplit(long offset, long length)89 public RangeInputSplit(long offset, long length) { 90 firstRow = offset; 91 rowCount = length; 92 } 93 getLength()94 public long getLength() throws IOException { 95 return 0; 96 } 97 getLocations()98 public String[] getLocations() throws IOException { 99 return new String[]{}; 100 } 101 readFields(DataInput in)102 public void readFields(DataInput in) throws IOException { 103 firstRow = WritableUtils.readVLong(in); 104 rowCount = WritableUtils.readVLong(in); 105 } 106 write(DataOutput out)107 public void write(DataOutput out) throws IOException { 108 WritableUtils.writeVLong(out, firstRow); 109 WritableUtils.writeVLong(out, rowCount); 110 } 111 } 112 113 /** 114 * A record reader that will generate a range of numbers. 115 */ 116 static class RangeRecordReader 117 extends RecordReader<LongWritable, NullWritable> { 118 long startRow; 119 long finishedRows; 120 long totalRows; 121 LongWritable key = null; 122 RangeRecordReader()123 public RangeRecordReader() { 124 } 125 initialize(InputSplit split, TaskAttemptContext context)126 public void initialize(InputSplit split, TaskAttemptContext context) 127 throws IOException, InterruptedException { 128 startRow = ((RangeInputSplit)split).firstRow; 129 finishedRows = 0; 130 totalRows = ((RangeInputSplit)split).rowCount; 131 } 132 close()133 public void close() throws IOException { 134 // NOTHING 135 } 136 getCurrentKey()137 public LongWritable getCurrentKey() { 138 return key; 139 } 140 getCurrentValue()141 public NullWritable getCurrentValue() { 142 return NullWritable.get(); 143 } 144 getProgress()145 public float getProgress() throws IOException { 146 return finishedRows / (float) totalRows; 147 } 148 nextKeyValue()149 public boolean nextKeyValue() { 150 if (key == null) { 151 key = new LongWritable(); 152 } 153 if (finishedRows < totalRows) { 154 key.set(startRow + finishedRows); 155 finishedRows += 1; 156 return true; 157 } else { 158 return false; 159 } 160 } 161 162 } 163 164 public RecordReader<LongWritable, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)165 createRecordReader(InputSplit split, TaskAttemptContext context) 166 throws IOException { 167 return new RangeRecordReader(); 168 } 169 170 /** 171 * Create the desired number of splits, dividing the number of rows 172 * between the mappers. 173 */ getSplits(JobContext job)174 public List<InputSplit> getSplits(JobContext job) { 175 long totalRows = getNumberOfRows(job); 176 int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 177 LOG.info("Generating " + totalRows + " using " + numSplits); 178 List<InputSplit> splits = new ArrayList<InputSplit>(); 179 long currentRow = 0; 180 for(int split = 0; split < numSplits; ++split) { 181 long goal = 182 (long) Math.ceil(totalRows * (double)(split + 1) / numSplits); 183 splits.add(new RangeInputSplit(currentRow, goal - currentRow)); 184 currentRow = goal; 185 } 186 return splits; 187 } 188 189 } 190 getNumberOfRows(JobContext job)191 static long getNumberOfRows(JobContext job) { 192 return job.getConfiguration().getLong(NUM_ROWS, 0); 193 } 194 setNumberOfRows(Job job, long numRows)195 static void setNumberOfRows(Job job, long numRows) { 196 job.getConfiguration().setLong(NUM_ROWS, numRows); 197 } 198 199 /** 200 * The Mapper class that given a row number, will generate the appropriate 201 * output line. 202 */ 203 public static class SortGenMapper 204 extends Mapper<LongWritable, NullWritable, Text, Text> { 205 206 private Text key = new Text(); 207 private Text value = new Text(); 208 private Unsigned16 rand = null; 209 private Unsigned16 rowId = null; 210 private Unsigned16 checksum = new Unsigned16(); 211 private Checksum crc32 = new PureJavaCrc32(); 212 private Unsigned16 total = new Unsigned16(); 213 private static final Unsigned16 ONE = new Unsigned16(1); 214 private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH + 215 TeraInputFormat.VALUE_LENGTH]; 216 private Counter checksumCounter; 217 map(LongWritable row, NullWritable ignored, Context context)218 public void map(LongWritable row, NullWritable ignored, 219 Context context) throws IOException, InterruptedException { 220 if (rand == null) { 221 rowId = new Unsigned16(row.get()); 222 rand = Random16.skipAhead(rowId); 223 checksumCounter = context.getCounter(Counters.CHECKSUM); 224 } 225 Random16.nextRand(rand); 226 GenSort.generateRecord(buffer, rand, rowId); 227 key.set(buffer, 0, TeraInputFormat.KEY_LENGTH); 228 value.set(buffer, TeraInputFormat.KEY_LENGTH, 229 TeraInputFormat.VALUE_LENGTH); 230 context.write(key, value); 231 crc32.reset(); 232 crc32.update(buffer, 0, 233 TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH); 234 checksum.set(crc32.getValue()); 235 total.add(checksum); 236 rowId.add(ONE); 237 } 238 239 @Override cleanup(Context context)240 public void cleanup(Context context) { 241 if (checksumCounter != null) { 242 checksumCounter.increment(total.getLow8()); 243 } 244 } 245 } 246 usage()247 private static void usage() throws IOException { 248 System.err.println("teragen <num rows> <output dir>"); 249 } 250 251 /** 252 * Parse a number that optionally has a postfix that denotes a base. 253 * @param str an string integer with an option base {k,m,b,t}. 254 * @return the expanded value 255 */ parseHumanLong(String str)256 private static long parseHumanLong(String str) { 257 char tail = str.charAt(str.length() - 1); 258 long base = 1; 259 switch (tail) { 260 case 't': 261 base *= 1000 * 1000 * 1000 * 1000; 262 break; 263 case 'b': 264 base *= 1000 * 1000 * 1000; 265 break; 266 case 'm': 267 base *= 1000 * 1000; 268 break; 269 case 'k': 270 base *= 1000; 271 break; 272 default: 273 } 274 if (base != 1) { 275 str = str.substring(0, str.length() - 1); 276 } 277 return Long.parseLong(str) * base; 278 } 279 280 /** 281 * @param args the cli arguments 282 */ run(String[] args)283 public int run(String[] args) 284 throws IOException, InterruptedException, ClassNotFoundException { 285 Job job = Job.getInstance(getConf()); 286 if (args.length != 2) { 287 usage(); 288 return 2; 289 } 290 setNumberOfRows(job, parseHumanLong(args[0])); 291 Path outputDir = new Path(args[1]); 292 FileOutputFormat.setOutputPath(job, outputDir); 293 job.setJobName("TeraGen"); 294 job.setJarByClass(TeraGen.class); 295 job.setMapperClass(SortGenMapper.class); 296 job.setNumReduceTasks(0); 297 job.setOutputKeyClass(Text.class); 298 job.setOutputValueClass(Text.class); 299 job.setInputFormatClass(RangeInputFormat.class); 300 job.setOutputFormatClass(TeraOutputFormat.class); 301 return job.waitForCompletion(true) ? 0 : 1; 302 } 303 main(String[] args)304 public static void main(String[] args) throws Exception { 305 int res = ToolRunner.run(new Configuration(), new TeraGen(), args); 306 System.exit(res); 307 } 308 } 309