1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs; 19 20 import java.io.IOException; 21 import java.util.Iterator; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.apache.hadoop.io.Text; 26 import org.apache.hadoop.mapred.*; 27 28 /** 29 * Reducer that accumulates values based on their type. 30 * <p> 31 * The type is specified in the key part of the key-value pair 32 * as a prefix to the key in the following way 33 * <p> 34 * <tt>type:key</tt> 35 * <p> 36 * The values are accumulated according to the types: 37 * <ul> 38 * <li><tt>s:</tt> - string, concatenate</li> 39 * <li><tt>f:</tt> - float, summ</li> 40 * <li><tt>l:</tt> - long, summ</li> 41 * </ul> 42 * 43 */ 44 @SuppressWarnings("deprecation") 45 public class AccumulatingReducer extends MapReduceBase 46 implements Reducer<Text, Text, Text, Text> { 47 static final String VALUE_TYPE_LONG = "l:"; 48 static final String VALUE_TYPE_FLOAT = "f:"; 49 static final String VALUE_TYPE_STRING = "s:"; 50 private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class); 51 52 protected String hostName; 53 AccumulatingReducer()54 public AccumulatingReducer () { 55 try { 56 hostName = java.net.InetAddress.getLocalHost().getHostName(); 57 } catch(Exception e) { 58 hostName = "localhost"; 59 } 60 LOG.info("Starting AccumulatingReducer on " + hostName); 61 } 62 reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter )63 public void reduce(Text key, 64 Iterator<Text> values, 65 OutputCollector<Text, Text> output, 66 Reporter reporter 67 ) throws IOException { 68 String field = key.toString(); 69 70 reporter.setStatus("starting " + field + " ::host = " + hostName); 71 72 // concatenate strings 73 if (field.startsWith(VALUE_TYPE_STRING)) { 74 StringBuffer sSum = new StringBuffer(); 75 while (values.hasNext()) 76 sSum.append(values.next().toString()).append(";"); 77 output.collect(key, new Text(sSum.toString())); 78 reporter.setStatus("finished " + field + " ::host = " + hostName); 79 return; 80 } 81 // sum long values 82 if (field.startsWith(VALUE_TYPE_FLOAT)) { 83 float fSum = 0; 84 while (values.hasNext()) 85 fSum += Float.parseFloat(values.next().toString()); 86 output.collect(key, new Text(String.valueOf(fSum))); 87 reporter.setStatus("finished " + field + " ::host = " + hostName); 88 return; 89 } 90 // sum long values 91 if (field.startsWith(VALUE_TYPE_LONG)) { 92 long lSum = 0; 93 while (values.hasNext()) { 94 lSum += Long.parseLong(values.next().toString()); 95 } 96 output.collect(key, new Text(String.valueOf(lSum))); 97 } 98 reporter.setStatus("finished " + field + " ::host = " + hostName); 99 } 100 } 101