1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs;
19 
20 import java.io.IOException;
21 import java.util.Iterator;
22 
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.io.Text;
26 import org.apache.hadoop.mapred.*;
27 
28 /**
29  * Reducer that accumulates values based on their type.
30  * <p>
31  * The type is specified in the key part of the key-value pair
32  * as a prefix to the key in the following way
33  * <p>
34  * <tt>type:key</tt>
35  * <p>
36  * The values are accumulated according to the types:
37  * <ul>
38  * <li><tt>s:</tt> - string, concatenate</li>
39  * <li><tt>f:</tt> - float, summ</li>
40  * <li><tt>l:</tt> - long, summ</li>
41  * </ul>
42  *
43  */
44 @SuppressWarnings("deprecation")
45 public class AccumulatingReducer extends MapReduceBase
46     implements Reducer<Text, Text, Text, Text> {
47   static final String VALUE_TYPE_LONG = "l:";
48   static final String VALUE_TYPE_FLOAT = "f:";
49   static final String VALUE_TYPE_STRING = "s:";
50   private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class);
51 
52   protected String hostName;
53 
AccumulatingReducer()54   public AccumulatingReducer () {
55     try {
56       hostName = java.net.InetAddress.getLocalHost().getHostName();
57     } catch(Exception e) {
58       hostName = "localhost";
59     }
60     LOG.info("Starting AccumulatingReducer on " + hostName);
61   }
62 
reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter )63   public void reduce(Text key,
64                      Iterator<Text> values,
65                      OutputCollector<Text, Text> output,
66                      Reporter reporter
67                      ) throws IOException {
68     String field = key.toString();
69 
70     reporter.setStatus("starting " + field + " ::host = " + hostName);
71 
72     // concatenate strings
73     if (field.startsWith(VALUE_TYPE_STRING)) {
74       StringBuffer sSum = new StringBuffer();
75       while (values.hasNext())
76         sSum.append(values.next().toString()).append(";");
77       output.collect(key, new Text(sSum.toString()));
78       reporter.setStatus("finished " + field + " ::host = " + hostName);
79       return;
80     }
81     // sum long values
82     if (field.startsWith(VALUE_TYPE_FLOAT)) {
83       float fSum = 0;
84       while (values.hasNext())
85         fSum += Float.parseFloat(values.next().toString());
86       output.collect(key, new Text(String.valueOf(fSum)));
87       reporter.setStatus("finished " + field + " ::host = " + hostName);
88       return;
89     }
90     // sum long values
91     if (field.startsWith(VALUE_TYPE_LONG)) {
92       long lSum = 0;
93       while (values.hasNext()) {
94         lSum += Long.parseLong(values.next().toString());
95       }
96       output.collect(key, new Text(String.valueOf(lSum)));
97     }
98     reporter.setStatus("finished " + field + " ::host = " + hostName);
99   }
100 }
101