1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.lib.aggregate;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.io.Text;
29 import org.apache.hadoop.mapreduce.InputFormat;
30 import org.apache.hadoop.mapreduce.Job;
31 import org.apache.hadoop.mapreduce.MRJobConfig;
32 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
33 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
34 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
35 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
36 import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
37 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
38 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
39 import org.apache.hadoop.util.GenericOptionsParser;
40 
41 /**
42  * This is the main class for creating a map/reduce job using Aggregate
43  * framework. The Aggregate is a specialization of map/reduce framework,
44  * specializing for performing various simple aggregations.
45  *
46  * Generally speaking, in order to implement an application using Map/Reduce
47  * model, the developer is to implement Map and Reduce functions (and possibly
48  * combine function). However, a lot of applications related to counting and
49  * statistics computing have very similar characteristics. Aggregate abstracts
50  * out the general patterns of these functions and implementing those patterns.
51  * In particular, the package provides generic mapper/redducer/combiner
52  * classes, and a set of built-in value aggregators, and a generic utility
53  * class that helps user create map/reduce jobs using the generic class.
54  * The built-in aggregators include:
55  *
56  * sum over numeric values count the number of distinct values compute the
57  * histogram of values compute the minimum, maximum, media,average, standard
58  * deviation of numeric values
59  *
60  * The developer using Aggregate will need only to provide a plugin class
61  * conforming to the following interface:
62  *
63  * public interface ValueAggregatorDescriptor { public ArrayList<Entry>
64  * generateKeyValPairs(Object key, Object value); public void
65  * configure(Configuration conf); }
66  *
67  * The package also provides a base class, ValueAggregatorBaseDescriptor,
68  * implementing the above interface. The user can extend the base class and
69  * implement generateKeyValPairs accordingly.
70  *
71  * The primary work of generateKeyValPairs is to emit one or more key/value
72  * pairs based on the input key/value pair. The key in an output key/value pair
73  * encode two pieces of information: aggregation type and aggregation id. The
74  * value will be aggregated onto the aggregation id according the aggregation
75  * type.
76  *
77  * This class offers a function to generate a map/reduce job using Aggregate
78  * framework. The function takes the following parameters: input directory spec
79  * input format (text or sequence file) output directory a file specifying the
80  * user plugin class
81  *
82  */
83 @InterfaceAudience.Public
84 @InterfaceStability.Stable
85 public class ValueAggregatorJob {
86 
createValueAggregatorJobs(String args[], Class<? extends ValueAggregatorDescriptor>[] descriptors)87   public static JobControl createValueAggregatorJobs(String args[],
88     Class<? extends ValueAggregatorDescriptor>[] descriptors)
89   throws IOException {
90 
91     JobControl theControl = new JobControl("ValueAggregatorJobs");
92     ArrayList<ControlledJob> dependingJobs = new ArrayList<ControlledJob>();
93     Configuration conf = new Configuration();
94     if (descriptors != null) {
95       conf = setAggregatorDescriptors(descriptors);
96     }
97     Job job = createValueAggregatorJob(conf, args);
98     ControlledJob cjob = new ControlledJob(job, dependingJobs);
99     theControl.addJob(cjob);
100     return theControl;
101   }
102 
createValueAggregatorJobs(String args[])103   public static JobControl createValueAggregatorJobs(String args[])
104       throws IOException {
105     return createValueAggregatorJobs(args, null);
106   }
107 
108   /**
109    * Create an Aggregate based map/reduce job.
110    *
111    * @param conf The configuration for job
112    * @param args the arguments used for job creation. Generic hadoop
113    * arguments are accepted.
114    * @return a Job object ready for submission.
115    *
116    * @throws IOException
117    * @see GenericOptionsParser
118    */
createValueAggregatorJob(Configuration conf, String args[])119   public static Job createValueAggregatorJob(Configuration conf, String args[])
120       throws IOException {
121 
122     GenericOptionsParser genericParser
123       = new GenericOptionsParser(conf, args);
124     args = genericParser.getRemainingArgs();
125 
126     if (args.length < 2) {
127       System.out.println("usage: inputDirs outDir "
128           + "[numOfReducer [textinputformat|seq [specfile [jobName]]]]");
129       GenericOptionsParser.printGenericCommandUsage(System.out);
130       System.exit(2);
131     }
132     String inputDir = args[0];
133     String outputDir = args[1];
134     int numOfReducers = 1;
135     if (args.length > 2) {
136       numOfReducers = Integer.parseInt(args[2]);
137     }
138 
139     Class<? extends InputFormat> theInputFormat = null;
140     if (args.length > 3 &&
141         args[3].compareToIgnoreCase("textinputformat") == 0) {
142       theInputFormat = TextInputFormat.class;
143     } else {
144       theInputFormat = SequenceFileInputFormat.class;
145     }
146 
147     Path specFile = null;
148 
149     if (args.length > 4) {
150       specFile = new Path(args[4]);
151     }
152 
153     String jobName = "";
154 
155     if (args.length > 5) {
156       jobName = args[5];
157     }
158 
159     if (specFile != null) {
160       conf.addResource(specFile);
161     }
162     String userJarFile = conf.get(ValueAggregatorJobBase.USER_JAR);
163     if (userJarFile != null) {
164       conf.set(MRJobConfig.JAR, userJarFile);
165     }
166 
167     Job theJob = Job.getInstance(conf);
168     if (userJarFile == null) {
169       theJob.setJarByClass(ValueAggregator.class);
170     }
171     theJob.setJobName("ValueAggregatorJob: " + jobName);
172 
173     FileInputFormat.addInputPaths(theJob, inputDir);
174 
175     theJob.setInputFormatClass(theInputFormat);
176 
177     theJob.setMapperClass(ValueAggregatorMapper.class);
178     FileOutputFormat.setOutputPath(theJob, new Path(outputDir));
179     theJob.setOutputFormatClass(TextOutputFormat.class);
180     theJob.setMapOutputKeyClass(Text.class);
181     theJob.setMapOutputValueClass(Text.class);
182     theJob.setOutputKeyClass(Text.class);
183     theJob.setOutputValueClass(Text.class);
184     theJob.setReducerClass(ValueAggregatorReducer.class);
185     theJob.setCombinerClass(ValueAggregatorCombiner.class);
186     theJob.setNumReduceTasks(numOfReducers);
187     return theJob;
188   }
189 
createValueAggregatorJob(String args[], Class<? extends ValueAggregatorDescriptor>[] descriptors)190   public static Job createValueAggregatorJob(String args[],
191       Class<? extends ValueAggregatorDescriptor>[] descriptors)
192       throws IOException {
193     return createValueAggregatorJob(
194              setAggregatorDescriptors(descriptors), args);
195   }
196 
setAggregatorDescriptors( Class<? extends ValueAggregatorDescriptor>[] descriptors)197   public static Configuration setAggregatorDescriptors(
198       Class<? extends ValueAggregatorDescriptor>[] descriptors) {
199     Configuration conf = new Configuration();
200     conf.setInt(ValueAggregatorJobBase.DESCRIPTOR_NUM, descriptors.length);
201     //specify the aggregator descriptors
202     for(int i=0; i< descriptors.length; i++) {
203       conf.set(ValueAggregatorJobBase.DESCRIPTOR + i,
204                "UserDefined," + descriptors[i].getName());
205     }
206     return conf;
207   }
208 
209   /**
210    * create and run an Aggregate based map/reduce job.
211    *
212    * @param args the arguments used for job creation
213    * @throws IOException
214    */
main(String args[])215   public static void main(String args[])
216       throws IOException, InterruptedException, ClassNotFoundException {
217     Job job = ValueAggregatorJob.createValueAggregatorJob(
218                 new Configuration(), args);
219     int ret = job.waitForCompletion(true) ? 0 : 1;
220     System.exit(ret);
221   }
222 }
223