1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.lib.aggregate;
20 
21 import java.util.ArrayList;
22 import java.util.Map.Entry;
23 
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.io.Text;
28 import org.apache.hadoop.mapreduce.MRJobConfig;
29 
30 /**
31  * This class implements the common functionalities of
32  * the subclasses of ValueAggregatorDescriptor class.
33  */
34 @InterfaceAudience.Public
35 @InterfaceStability.Stable
36 public class ValueAggregatorBaseDescriptor
37     implements ValueAggregatorDescriptor {
38 
39   static public final String UNIQ_VALUE_COUNT = "UniqValueCount";
40 
41   static public final String LONG_VALUE_SUM = "LongValueSum";
42 
43   static public final String DOUBLE_VALUE_SUM = "DoubleValueSum";
44 
45   static public final String VALUE_HISTOGRAM = "ValueHistogram";
46 
47   static public final String LONG_VALUE_MAX = "LongValueMax";
48 
49   static public final String LONG_VALUE_MIN = "LongValueMin";
50 
51   static public final String STRING_VALUE_MAX = "StringValueMax";
52 
53   static public final String STRING_VALUE_MIN = "StringValueMin";
54 
55   public String inputFile = null;
56 
57   private static class MyEntry implements Entry<Text, Text> {
58     Text key;
59 
60     Text val;
61 
getKey()62     public Text getKey() {
63       return key;
64     }
65 
getValue()66     public Text getValue() {
67       return val;
68     }
69 
setValue(Text val)70     public Text setValue(Text val) {
71       this.val = val;
72       return val;
73     }
74 
MyEntry(Text key, Text val)75     public MyEntry(Text key, Text val) {
76       this.key = key;
77       this.val = val;
78     }
79   }
80 
81   /**
82    *
83    * @param type the aggregation type
84    * @param id the aggregation id
85    * @param val the val associated with the id to be aggregated
86    * @return an Entry whose key is the aggregation id prefixed with
87    * the aggregation type.
88    */
generateEntry(String type, String id, Text val)89   public static Entry<Text, Text> generateEntry(String type,
90       String id, Text val) {
91     Text key = new Text(type + TYPE_SEPARATOR + id);
92     return new MyEntry(key, val);
93   }
94 
95   /**
96    *
97    * @param type the aggregation type
98    * @param uniqCount the limit in the number of unique values to keep,
99    *                  if type is UNIQ_VALUE_COUNT
100    * @return a value aggregator of the given type.
101    */
generateValueAggregator(String type, long uniqCount)102   static public ValueAggregator generateValueAggregator(String type, long uniqCount) {
103     if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) {
104       return new LongValueSum();
105     } if (type.compareToIgnoreCase(LONG_VALUE_MAX) == 0) {
106       return new LongValueMax();
107     } else if (type.compareToIgnoreCase(LONG_VALUE_MIN) == 0) {
108       return new LongValueMin();
109     } else if (type.compareToIgnoreCase(STRING_VALUE_MAX) == 0) {
110       return new StringValueMax();
111     } else if (type.compareToIgnoreCase(STRING_VALUE_MIN) == 0) {
112       return new StringValueMin();
113     } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) {
114       return new DoubleValueSum();
115     } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) {
116       return new UniqValueCount(uniqCount);
117     } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) {
118       return new ValueHistogram();
119     }
120     return null;
121   }
122 
123   /**
124    * Generate 1 or 2 aggregation-id/value pairs for the given key/value pair.
125    * The first id will be of type LONG_VALUE_SUM, with "record_count" as
126    * its aggregation id. If the input is a file split,
127    * the second id of the same type will be generated too, with the file name
128    * as its aggregation id. This achieves the behavior of counting the total
129    * number of records in the input data, and the number of records
130    * in each input file.
131    *
132    * @param key
133    *          input key
134    * @param val
135    *          input value
136    * @return a list of aggregation id/value pairs. An aggregation id encodes an
137    *         aggregation type which is used to guide the way to aggregate the
138    *         value in the reduce/combiner phrase of an Aggregate based job.
139    */
generateKeyValPairs(Object key, Object val)140   public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
141                                                           Object val) {
142     ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
143     String countType = LONG_VALUE_SUM;
144     String id = "record_count";
145     Entry<Text, Text> e = generateEntry(countType, id, ONE);
146     if (e != null) {
147       retv.add(e);
148     }
149     if (this.inputFile != null) {
150       e = generateEntry(countType, this.inputFile, ONE);
151       if (e != null) {
152         retv.add(e);
153       }
154     }
155     return retv;
156   }
157 
158   /**
159    * get the input file name.
160    *
161    * @param conf a configuration object
162    */
configure(Configuration conf)163   public void configure(Configuration conf) {
164     this.inputFile = conf.get(MRJobConfig.MAP_INPUT_FILE);
165   }
166 }
167