1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Random;
26 
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.conf.Configured;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.io.BytesWritable;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.io.Writable;
33 import org.apache.hadoop.io.WritableComparable;
34 import org.apache.hadoop.mapred.ClusterStatus;
35 import org.apache.hadoop.mapred.JobClient;
36 import org.apache.hadoop.mapreduce.*;
37 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
38 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
39 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
40 import org.apache.hadoop.util.GenericOptionsParser;
41 import org.apache.hadoop.util.Tool;
42 import org.apache.hadoop.util.ToolRunner;
43 
44 /**
45  * This program uses map/reduce to just run a distributed job where there is
46  * no interaction between the tasks and each task write a large unsorted
47  * random binary sequence file of BytesWritable.
48  * In order for this program to generate data for terasort with 10-byte keys
49  * and 90-byte values, have the following config:
50  * <pre>{@code
51  * <?xml version="1.0"?>
52  * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
53  * <configuration>
54  *   <property>
55  *     <name>mapreduce.randomwriter.minkey</name>
56  *     <value>10</value>
57  *   </property>
58  *   <property>
59  *     <name>mapreduce.randomwriter.maxkey</name>
60  *     <value>10</value>
61  *   </property>
62  *   <property>
63  *     <name>mapreduce.randomwriter.minvalue</name>
64  *     <value>90</value>
65  *   </property>
66  *   <property>
67  *     <name>mapreduce.randomwriter.maxvalue</name>
68  *     <value>90</value>
69  *   </property>
70  *   <property>
71  *     <name>mapreduce.randomwriter.totalbytes</name>
72  *     <value>1099511627776</value>
73  *   </property>
74  * </configuration>}</pre>
75  * Equivalently, {@link RandomWriter} also supports all the above options
76  * and ones supported by {@link GenericOptionsParser} via the command-line.
77  */
78 public class RandomWriter extends Configured implements Tool {
79   public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes";
80   public static final String BYTES_PER_MAP =
81     "mapreduce.randomwriter.bytespermap";
82   public static final String MAPS_PER_HOST =
83     "mapreduce.randomwriter.mapsperhost";
84   public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue";
85   public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue";
86   public static final String MIN_KEY = "mapreduce.randomwriter.minkey";
87   public static final String MAX_KEY = "mapreduce.randomwriter.maxkey";
88 
89   /**
90    * User counters
91    */
92   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
93 
94   /**
95    * A custom input format that creates virtual inputs of a single string
96    * for each map.
97    */
98   static class RandomInputFormat extends InputFormat<Text, Text> {
99 
100     /**
101      * Generate the requested number of file splits, with the filename
102      * set to the filename of the output file.
103      */
getSplits(JobContext job)104     public List<InputSplit> getSplits(JobContext job) throws IOException {
105       List<InputSplit> result = new ArrayList<InputSplit>();
106       Path outDir = FileOutputFormat.getOutputPath(job);
107       int numSplits =
108             job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
109       for(int i=0; i < numSplits; ++i) {
110         result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
111                                   (String[])null));
112       }
113       return result;
114     }
115 
116     /**
117      * Return a single record (filename, "") where the filename is taken from
118      * the file split.
119      */
120     static class RandomRecordReader extends RecordReader<Text, Text> {
121       Path name;
122       Text key = null;
123       Text value = new Text();
RandomRecordReader(Path p)124       public RandomRecordReader(Path p) {
125         name = p;
126       }
127 
initialize(InputSplit split, TaskAttemptContext context)128       public void initialize(InputSplit split,
129                              TaskAttemptContext context)
130       throws IOException, InterruptedException {
131 
132       }
133 
nextKeyValue()134       public boolean nextKeyValue() {
135         if (name != null) {
136           key = new Text();
137           key.set(name.getName());
138           name = null;
139           return true;
140         }
141         return false;
142       }
143 
getCurrentKey()144       public Text getCurrentKey() {
145         return key;
146       }
147 
getCurrentValue()148       public Text getCurrentValue() {
149         return value;
150       }
151 
close()152       public void close() {}
153 
getProgress()154       public float getProgress() {
155         return 0.0f;
156       }
157     }
158 
createRecordReader(InputSplit split, TaskAttemptContext context)159     public RecordReader<Text, Text> createRecordReader(InputSplit split,
160         TaskAttemptContext context) throws IOException, InterruptedException {
161       return new RandomRecordReader(((FileSplit) split).getPath());
162     }
163   }
164 
165   static class RandomMapper extends Mapper<WritableComparable, Writable,
166                       BytesWritable, BytesWritable> {
167 
168     private long numBytesToWrite;
169     private int minKeySize;
170     private int keySizeRange;
171     private int minValueSize;
172     private int valueSizeRange;
173     private Random random = new Random();
174     private BytesWritable randomKey = new BytesWritable();
175     private BytesWritable randomValue = new BytesWritable();
176 
randomizeBytes(byte[] data, int offset, int length)177     private void randomizeBytes(byte[] data, int offset, int length) {
178       for(int i=offset + length - 1; i >= offset; --i) {
179         data[i] = (byte) random.nextInt(256);
180       }
181     }
182 
183     /**
184      * Given an output filename, write a bunch of random records to it.
185      */
map(WritableComparable key, Writable value, Context context)186     public void map(WritableComparable key,
187                     Writable value,
188                     Context context) throws IOException,InterruptedException {
189       int itemCount = 0;
190       while (numBytesToWrite > 0) {
191         int keyLength = minKeySize +
192           (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
193         randomKey.setSize(keyLength);
194         randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
195         int valueLength = minValueSize +
196           (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
197         randomValue.setSize(valueLength);
198         randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
199         context.write(randomKey, randomValue);
200         numBytesToWrite -= keyLength + valueLength;
201         context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
202         context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
203         if (++itemCount % 200 == 0) {
204           context.setStatus("wrote record " + itemCount + ". " +
205                              numBytesToWrite + " bytes left.");
206         }
207       }
208       context.setStatus("done with " + itemCount + " records.");
209     }
210 
211     /**
212      * Save the values out of the configuaration that we need to write
213      * the data.
214      */
215     @Override
setup(Context context)216     public void setup(Context context) {
217       Configuration conf = context.getConfiguration();
218       numBytesToWrite = conf.getLong(BYTES_PER_MAP,
219                                     1*1024*1024*1024);
220       minKeySize = conf.getInt(MIN_KEY, 10);
221       keySizeRange =
222         conf.getInt(MAX_KEY, 1000) - minKeySize;
223       minValueSize = conf.getInt(MIN_VALUE, 0);
224       valueSizeRange =
225         conf.getInt(MAX_VALUE, 20000) - minValueSize;
226     }
227   }
228 
229   /**
230    * This is the main routine for launching a distributed random write job.
231    * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
232    * The reduce doesn't do anything.
233    *
234    * @throws IOException
235    */
run(String[] args)236   public int run(String[] args) throws Exception {
237     if (args.length == 0) {
238       System.out.println("Usage: writer <out-dir>");
239       ToolRunner.printGenericCommandUsage(System.out);
240       return 2;
241     }
242 
243     Path outDir = new Path(args[0]);
244     Configuration conf = getConf();
245     JobClient client = new JobClient(conf);
246     ClusterStatus cluster = client.getClusterStatus();
247     int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
248     long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
249                                              1*1024*1024*1024);
250     if (numBytesToWritePerMap == 0) {
251       System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
252       return -2;
253     }
254     long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
255          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
256     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
257     if (numMaps == 0 && totalBytesToWrite > 0) {
258       numMaps = 1;
259       conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
260     }
261     conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
262 
263     Job job = Job.getInstance(conf);
264 
265     job.setJarByClass(RandomWriter.class);
266     job.setJobName("random-writer");
267     FileOutputFormat.setOutputPath(job, outDir);
268     job.setOutputKeyClass(BytesWritable.class);
269     job.setOutputValueClass(BytesWritable.class);
270     job.setInputFormatClass(RandomInputFormat.class);
271     job.setMapperClass(RandomMapper.class);
272     job.setReducerClass(Reducer.class);
273     job.setOutputFormatClass(SequenceFileOutputFormat.class);
274 
275     System.out.println("Running " + numMaps + " maps.");
276 
277     // reducer NONE
278     job.setNumReduceTasks(0);
279 
280     Date startTime = new Date();
281     System.out.println("Job started: " + startTime);
282     int ret = job.waitForCompletion(true) ? 0 : 1;
283     Date endTime = new Date();
284     System.out.println("Job ended: " + endTime);
285     System.out.println("The job took " +
286                        (endTime.getTime() - startTime.getTime()) /1000 +
287                        " seconds.");
288 
289     return ret;
290   }
291 
main(String[] args)292   public static void main(String[] args) throws Exception {
293     int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
294     System.exit(res);
295   }
296 
297 }
298