1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Date; 24 import java.util.List; 25 import java.util.Random; 26 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.conf.Configured; 29 import org.apache.hadoop.fs.Path; 30 import org.apache.hadoop.io.BytesWritable; 31 import org.apache.hadoop.io.Text; 32 import org.apache.hadoop.io.Writable; 33 import org.apache.hadoop.io.WritableComparable; 34 import org.apache.hadoop.mapred.ClusterStatus; 35 import org.apache.hadoop.mapred.JobClient; 36 import org.apache.hadoop.mapreduce.*; 37 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 38 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 39 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 40 import org.apache.hadoop.util.GenericOptionsParser; 41 import org.apache.hadoop.util.Tool; 42 import org.apache.hadoop.util.ToolRunner; 43 44 /** 45 * This program uses map/reduce to just run a distributed job where there is 46 * no interaction between the tasks and each task write a large unsorted 47 * random binary sequence file of BytesWritable. 48 * In order for this program to generate data for terasort with 10-byte keys 49 * and 90-byte values, have the following config: 50 * <pre>{@code 51 * <?xml version="1.0"?> 52 * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 53 * <configuration> 54 * <property> 55 * <name>mapreduce.randomwriter.minkey</name> 56 * <value>10</value> 57 * </property> 58 * <property> 59 * <name>mapreduce.randomwriter.maxkey</name> 60 * <value>10</value> 61 * </property> 62 * <property> 63 * <name>mapreduce.randomwriter.minvalue</name> 64 * <value>90</value> 65 * </property> 66 * <property> 67 * <name>mapreduce.randomwriter.maxvalue</name> 68 * <value>90</value> 69 * </property> 70 * <property> 71 * <name>mapreduce.randomwriter.totalbytes</name> 72 * <value>1099511627776</value> 73 * </property> 74 * </configuration>}</pre> 75 * Equivalently, {@link RandomWriter} also supports all the above options 76 * and ones supported by {@link GenericOptionsParser} via the command-line. 77 */ 78 public class RandomWriter extends Configured implements Tool { 79 public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes"; 80 public static final String BYTES_PER_MAP = 81 "mapreduce.randomwriter.bytespermap"; 82 public static final String MAPS_PER_HOST = 83 "mapreduce.randomwriter.mapsperhost"; 84 public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue"; 85 public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue"; 86 public static final String MIN_KEY = "mapreduce.randomwriter.minkey"; 87 public static final String MAX_KEY = "mapreduce.randomwriter.maxkey"; 88 89 /** 90 * User counters 91 */ 92 static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } 93 94 /** 95 * A custom input format that creates virtual inputs of a single string 96 * for each map. 97 */ 98 static class RandomInputFormat extends InputFormat<Text, Text> { 99 100 /** 101 * Generate the requested number of file splits, with the filename 102 * set to the filename of the output file. 103 */ getSplits(JobContext job)104 public List<InputSplit> getSplits(JobContext job) throws IOException { 105 List<InputSplit> result = new ArrayList<InputSplit>(); 106 Path outDir = FileOutputFormat.getOutputPath(job); 107 int numSplits = 108 job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 109 for(int i=0; i < numSplits; ++i) { 110 result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 111 (String[])null)); 112 } 113 return result; 114 } 115 116 /** 117 * Return a single record (filename, "") where the filename is taken from 118 * the file split. 119 */ 120 static class RandomRecordReader extends RecordReader<Text, Text> { 121 Path name; 122 Text key = null; 123 Text value = new Text(); RandomRecordReader(Path p)124 public RandomRecordReader(Path p) { 125 name = p; 126 } 127 initialize(InputSplit split, TaskAttemptContext context)128 public void initialize(InputSplit split, 129 TaskAttemptContext context) 130 throws IOException, InterruptedException { 131 132 } 133 nextKeyValue()134 public boolean nextKeyValue() { 135 if (name != null) { 136 key = new Text(); 137 key.set(name.getName()); 138 name = null; 139 return true; 140 } 141 return false; 142 } 143 getCurrentKey()144 public Text getCurrentKey() { 145 return key; 146 } 147 getCurrentValue()148 public Text getCurrentValue() { 149 return value; 150 } 151 close()152 public void close() {} 153 getProgress()154 public float getProgress() { 155 return 0.0f; 156 } 157 } 158 createRecordReader(InputSplit split, TaskAttemptContext context)159 public RecordReader<Text, Text> createRecordReader(InputSplit split, 160 TaskAttemptContext context) throws IOException, InterruptedException { 161 return new RandomRecordReader(((FileSplit) split).getPath()); 162 } 163 } 164 165 static class RandomMapper extends Mapper<WritableComparable, Writable, 166 BytesWritable, BytesWritable> { 167 168 private long numBytesToWrite; 169 private int minKeySize; 170 private int keySizeRange; 171 private int minValueSize; 172 private int valueSizeRange; 173 private Random random = new Random(); 174 private BytesWritable randomKey = new BytesWritable(); 175 private BytesWritable randomValue = new BytesWritable(); 176 randomizeBytes(byte[] data, int offset, int length)177 private void randomizeBytes(byte[] data, int offset, int length) { 178 for(int i=offset + length - 1; i >= offset; --i) { 179 data[i] = (byte) random.nextInt(256); 180 } 181 } 182 183 /** 184 * Given an output filename, write a bunch of random records to it. 185 */ map(WritableComparable key, Writable value, Context context)186 public void map(WritableComparable key, 187 Writable value, 188 Context context) throws IOException,InterruptedException { 189 int itemCount = 0; 190 while (numBytesToWrite > 0) { 191 int keyLength = minKeySize + 192 (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); 193 randomKey.setSize(keyLength); 194 randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); 195 int valueLength = minValueSize + 196 (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); 197 randomValue.setSize(valueLength); 198 randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); 199 context.write(randomKey, randomValue); 200 numBytesToWrite -= keyLength + valueLength; 201 context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength); 202 context.getCounter(Counters.RECORDS_WRITTEN).increment(1); 203 if (++itemCount % 200 == 0) { 204 context.setStatus("wrote record " + itemCount + ". " + 205 numBytesToWrite + " bytes left."); 206 } 207 } 208 context.setStatus("done with " + itemCount + " records."); 209 } 210 211 /** 212 * Save the values out of the configuaration that we need to write 213 * the data. 214 */ 215 @Override setup(Context context)216 public void setup(Context context) { 217 Configuration conf = context.getConfiguration(); 218 numBytesToWrite = conf.getLong(BYTES_PER_MAP, 219 1*1024*1024*1024); 220 minKeySize = conf.getInt(MIN_KEY, 10); 221 keySizeRange = 222 conf.getInt(MAX_KEY, 1000) - minKeySize; 223 minValueSize = conf.getInt(MIN_VALUE, 0); 224 valueSizeRange = 225 conf.getInt(MAX_VALUE, 20000) - minValueSize; 226 } 227 } 228 229 /** 230 * This is the main routine for launching a distributed random write job. 231 * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. 232 * The reduce doesn't do anything. 233 * 234 * @throws IOException 235 */ run(String[] args)236 public int run(String[] args) throws Exception { 237 if (args.length == 0) { 238 System.out.println("Usage: writer <out-dir>"); 239 ToolRunner.printGenericCommandUsage(System.out); 240 return 2; 241 } 242 243 Path outDir = new Path(args[0]); 244 Configuration conf = getConf(); 245 JobClient client = new JobClient(conf); 246 ClusterStatus cluster = client.getClusterStatus(); 247 int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); 248 long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 249 1*1024*1024*1024); 250 if (numBytesToWritePerMap == 0) { 251 System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); 252 return -2; 253 } 254 long totalBytesToWrite = conf.getLong(TOTAL_BYTES, 255 numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); 256 int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); 257 if (numMaps == 0 && totalBytesToWrite > 0) { 258 numMaps = 1; 259 conf.setLong(BYTES_PER_MAP, totalBytesToWrite); 260 } 261 conf.setInt(MRJobConfig.NUM_MAPS, numMaps); 262 263 Job job = Job.getInstance(conf); 264 265 job.setJarByClass(RandomWriter.class); 266 job.setJobName("random-writer"); 267 FileOutputFormat.setOutputPath(job, outDir); 268 job.setOutputKeyClass(BytesWritable.class); 269 job.setOutputValueClass(BytesWritable.class); 270 job.setInputFormatClass(RandomInputFormat.class); 271 job.setMapperClass(RandomMapper.class); 272 job.setReducerClass(Reducer.class); 273 job.setOutputFormatClass(SequenceFileOutputFormat.class); 274 275 System.out.println("Running " + numMaps + " maps."); 276 277 // reducer NONE 278 job.setNumReduceTasks(0); 279 280 Date startTime = new Date(); 281 System.out.println("Job started: " + startTime); 282 int ret = job.waitForCompletion(true) ? 0 : 1; 283 Date endTime = new Date(); 284 System.out.println("Job ended: " + endTime); 285 System.out.println("The job took " + 286 (endTime.getTime() - startTime.getTime()) /1000 + 287 " seconds."); 288 289 return ret; 290 } 291 main(String[] args)292 public static void main(String[] args) throws Exception { 293 int res = ToolRunner.run(new Configuration(), new RandomWriter(), args); 294 System.exit(res); 295 } 296 297 } 298