1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.examples.terasort;
20 
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.zip.Checksum;
27 
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.conf.Configured;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.io.NullWritable;
35 import org.apache.hadoop.io.Text;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.io.WritableUtils;
38 import org.apache.hadoop.mapreduce.Cluster;
39 import org.apache.hadoop.mapreduce.Counter;
40 import org.apache.hadoop.mapreduce.InputFormat;
41 import org.apache.hadoop.mapreduce.InputSplit;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.mapreduce.JobContext;
44 import org.apache.hadoop.mapreduce.MRJobConfig;
45 import org.apache.hadoop.mapreduce.Mapper;
46 import org.apache.hadoop.mapreduce.RecordReader;
47 import org.apache.hadoop.mapreduce.TaskAttemptContext;
48 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
49 import org.apache.hadoop.util.PureJavaCrc32;
50 import org.apache.hadoop.util.Tool;
51 import org.apache.hadoop.util.ToolRunner;
52 
53 /**
54  * Generate the official GraySort input data set.
55  * The user specifies the number of rows and the output directory and this
56  * class runs a map/reduce program to generate the data.
57  * The format of the data is:
58  * <ul>
59  * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid)
60  *     (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
61  * <li>The rowid is the right justified row id as a hex number.
62  * </ul>
63  *
64  * <p>
65  * To run the program:
66  * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
67  */
68 public class TeraGen extends Configured implements Tool {
69   private static final Log LOG = LogFactory.getLog(TeraSort.class);
70 
71   public static enum Counters {CHECKSUM}
72 
73   public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
74   /**
75    * An input format that assigns ranges of longs to each mapper.
76    */
77   static class RangeInputFormat
78       extends InputFormat<LongWritable, NullWritable> {
79 
80     /**
81      * An input split consisting of a range on numbers.
82      */
83     static class RangeInputSplit extends InputSplit implements Writable {
84       long firstRow;
85       long rowCount;
86 
RangeInputSplit()87       public RangeInputSplit() { }
88 
RangeInputSplit(long offset, long length)89       public RangeInputSplit(long offset, long length) {
90         firstRow = offset;
91         rowCount = length;
92       }
93 
getLength()94       public long getLength() throws IOException {
95         return 0;
96       }
97 
getLocations()98       public String[] getLocations() throws IOException {
99         return new String[]{};
100       }
101 
readFields(DataInput in)102       public void readFields(DataInput in) throws IOException {
103         firstRow = WritableUtils.readVLong(in);
104         rowCount = WritableUtils.readVLong(in);
105       }
106 
write(DataOutput out)107       public void write(DataOutput out) throws IOException {
108         WritableUtils.writeVLong(out, firstRow);
109         WritableUtils.writeVLong(out, rowCount);
110       }
111     }
112 
113     /**
114      * A record reader that will generate a range of numbers.
115      */
116     static class RangeRecordReader
117         extends RecordReader<LongWritable, NullWritable> {
118       long startRow;
119       long finishedRows;
120       long totalRows;
121       LongWritable key = null;
122 
RangeRecordReader()123       public RangeRecordReader() {
124       }
125 
initialize(InputSplit split, TaskAttemptContext context)126       public void initialize(InputSplit split, TaskAttemptContext context)
127           throws IOException, InterruptedException {
128         startRow = ((RangeInputSplit)split).firstRow;
129         finishedRows = 0;
130         totalRows = ((RangeInputSplit)split).rowCount;
131       }
132 
close()133       public void close() throws IOException {
134         // NOTHING
135       }
136 
getCurrentKey()137       public LongWritable getCurrentKey() {
138         return key;
139       }
140 
getCurrentValue()141       public NullWritable getCurrentValue() {
142         return NullWritable.get();
143       }
144 
getProgress()145       public float getProgress() throws IOException {
146         return finishedRows / (float) totalRows;
147       }
148 
nextKeyValue()149       public boolean nextKeyValue() {
150         if (key == null) {
151           key = new LongWritable();
152         }
153         if (finishedRows < totalRows) {
154           key.set(startRow + finishedRows);
155           finishedRows += 1;
156           return true;
157         } else {
158           return false;
159         }
160       }
161 
162     }
163 
164     public RecordReader<LongWritable, NullWritable>
createRecordReader(InputSplit split, TaskAttemptContext context)165         createRecordReader(InputSplit split, TaskAttemptContext context)
166         throws IOException {
167       return new RangeRecordReader();
168     }
169 
170     /**
171      * Create the desired number of splits, dividing the number of rows
172      * between the mappers.
173      */
getSplits(JobContext job)174     public List<InputSplit> getSplits(JobContext job) {
175       long totalRows = getNumberOfRows(job);
176       int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
177       LOG.info("Generating " + totalRows + " using " + numSplits);
178       List<InputSplit> splits = new ArrayList<InputSplit>();
179       long currentRow = 0;
180       for(int split = 0; split < numSplits; ++split) {
181         long goal =
182           (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
183         splits.add(new RangeInputSplit(currentRow, goal - currentRow));
184         currentRow = goal;
185       }
186       return splits;
187     }
188 
189   }
190 
getNumberOfRows(JobContext job)191   static long getNumberOfRows(JobContext job) {
192     return job.getConfiguration().getLong(NUM_ROWS, 0);
193   }
194 
setNumberOfRows(Job job, long numRows)195   static void setNumberOfRows(Job job, long numRows) {
196     job.getConfiguration().setLong(NUM_ROWS, numRows);
197   }
198 
199   /**
200    * The Mapper class that given a row number, will generate the appropriate
201    * output line.
202    */
203   public static class SortGenMapper
204       extends Mapper<LongWritable, NullWritable, Text, Text> {
205 
206     private Text key = new Text();
207     private Text value = new Text();
208     private Unsigned16 rand = null;
209     private Unsigned16 rowId = null;
210     private Unsigned16 checksum = new Unsigned16();
211     private Checksum crc32 = new PureJavaCrc32();
212     private Unsigned16 total = new Unsigned16();
213     private static final Unsigned16 ONE = new Unsigned16(1);
214     private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
215                                      TeraInputFormat.VALUE_LENGTH];
216     private Counter checksumCounter;
217 
map(LongWritable row, NullWritable ignored, Context context)218     public void map(LongWritable row, NullWritable ignored,
219         Context context) throws IOException, InterruptedException {
220       if (rand == null) {
221         rowId = new Unsigned16(row.get());
222         rand = Random16.skipAhead(rowId);
223         checksumCounter = context.getCounter(Counters.CHECKSUM);
224       }
225       Random16.nextRand(rand);
226       GenSort.generateRecord(buffer, rand, rowId);
227       key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
228       value.set(buffer, TeraInputFormat.KEY_LENGTH,
229                 TeraInputFormat.VALUE_LENGTH);
230       context.write(key, value);
231       crc32.reset();
232       crc32.update(buffer, 0,
233                    TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
234       checksum.set(crc32.getValue());
235       total.add(checksum);
236       rowId.add(ONE);
237     }
238 
239     @Override
cleanup(Context context)240     public void cleanup(Context context) {
241       if (checksumCounter != null) {
242         checksumCounter.increment(total.getLow8());
243       }
244     }
245   }
246 
usage()247   private static void usage() throws IOException {
248     System.err.println("teragen <num rows> <output dir>");
249   }
250 
251   /**
252    * Parse a number that optionally has a postfix that denotes a base.
253    * @param str an string integer with an option base {k,m,b,t}.
254    * @return the expanded value
255    */
parseHumanLong(String str)256   private static long parseHumanLong(String str) {
257     char tail = str.charAt(str.length() - 1);
258     long base = 1;
259     switch (tail) {
260     case 't':
261       base *= 1000 * 1000 * 1000 * 1000;
262       break;
263     case 'b':
264       base *= 1000 * 1000 * 1000;
265       break;
266     case 'm':
267       base *= 1000 * 1000;
268       break;
269     case 'k':
270       base *= 1000;
271       break;
272     default:
273     }
274     if (base != 1) {
275       str = str.substring(0, str.length() - 1);
276     }
277     return Long.parseLong(str) * base;
278   }
279 
280   /**
281    * @param args the cli arguments
282    */
run(String[] args)283   public int run(String[] args)
284       throws IOException, InterruptedException, ClassNotFoundException {
285     Job job = Job.getInstance(getConf());
286     if (args.length != 2) {
287       usage();
288       return 2;
289     }
290     setNumberOfRows(job, parseHumanLong(args[0]));
291     Path outputDir = new Path(args[1]);
292     FileOutputFormat.setOutputPath(job, outputDir);
293     job.setJobName("TeraGen");
294     job.setJarByClass(TeraGen.class);
295     job.setMapperClass(SortGenMapper.class);
296     job.setNumReduceTasks(0);
297     job.setOutputKeyClass(Text.class);
298     job.setOutputValueClass(Text.class);
299     job.setInputFormatClass(RangeInputFormat.class);
300     job.setOutputFormatClass(TeraOutputFormat.class);
301     return job.waitForCompletion(true) ? 0 : 1;
302   }
303 
main(String[] args)304   public static void main(String[] args) throws Exception {
305     int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
306     System.exit(res);
307   }
308 }
309