1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.contrib.utils.join; 20 21 import java.util.SortedMap; 22 import java.util.TreeMap; 23 import java.util.Map.Entry; 24 import java.util.Iterator; 25 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.mapred.JobConf; 29 import org.apache.hadoop.mapred.Mapper; 30 import org.apache.hadoop.mapred.Reducer; 31 32 /** 33 * A common base implementing some statics collecting mechanisms that are 34 * commonly used in a typical map/reduce job. 35 * 36 */ 37 public abstract class JobBase implements Mapper, Reducer { 38 39 public static final Log LOG = LogFactory.getLog("datajoin.job"); 40 41 private SortedMap<Object, Long> longCounters = null; 42 43 private SortedMap<Object, Double> doubleCounters = null; 44 45 /** 46 * Set the given counter to the given value 47 * 48 * @param name 49 * the counter name 50 * @param value 51 * the value for the counter 52 */ setLongValue(Object name, long value)53 protected void setLongValue(Object name, long value) { 54 this.longCounters.put(name, Long.valueOf(value)); 55 } 56 57 /** 58 * Set the given counter to the given value 59 * 60 * @param name 61 * the counter name 62 * @param value 63 * the value for the counter 64 */ setDoubleValue(Object name, double value)65 protected void setDoubleValue(Object name, double value) { 66 this.doubleCounters.put(name, new Double(value)); 67 } 68 69 /** 70 * 71 * @param name 72 * the counter name 73 * @return return the value of the given counter. 74 */ getLongValue(Object name)75 protected Long getLongValue(Object name) { 76 return this.longCounters.get(name); 77 } 78 79 /** 80 * 81 * @param name 82 * the counter name 83 * @return return the value of the given counter. 84 */ getDoubleValue(Object name)85 protected Double getDoubleValue(Object name) { 86 return this.doubleCounters.get(name); 87 } 88 89 /** 90 * Increment the given counter by the given incremental value If the counter 91 * does not exist, one is created with value 0. 92 * 93 * @param name 94 * the counter name 95 * @param inc 96 * the incremental value 97 * @return the updated value. 98 */ addLongValue(Object name, long inc)99 protected Long addLongValue(Object name, long inc) { 100 Long val = this.longCounters.get(name); 101 Long retv = null; 102 if (val == null) { 103 retv = Long.valueOf(inc); 104 } else { 105 retv = Long.valueOf(val.longValue() + inc); 106 } 107 this.longCounters.put(name, retv); 108 return retv; 109 } 110 111 /** 112 * Increment the given counter by the given incremental value If the counter 113 * does not exist, one is created with value 0. 114 * 115 * @param name 116 * the counter name 117 * @param inc 118 * the incremental value 119 * @return the updated value. 120 */ addDoubleValue(Object name, double inc)121 protected Double addDoubleValue(Object name, double inc) { 122 Double val = this.doubleCounters.get(name); 123 Double retv = null; 124 if (val == null) { 125 retv = new Double(inc); 126 } else { 127 retv = new Double(val.doubleValue() + inc); 128 } 129 this.doubleCounters.put(name, retv); 130 return retv; 131 } 132 133 /** 134 * log the counters 135 * 136 */ report()137 protected void report() { 138 LOG.info(getReport()); 139 } 140 141 /** 142 * log the counters 143 * 144 */ getReport()145 protected String getReport() { 146 StringBuffer sb = new StringBuffer(); 147 148 Iterator iter = this.longCounters.entrySet().iterator(); 149 while (iter.hasNext()) { 150 Entry e = (Entry) iter.next(); 151 sb.append(e.getKey().toString()).append("\t").append(e.getValue()) 152 .append("\n"); 153 } 154 iter = this.doubleCounters.entrySet().iterator(); 155 while (iter.hasNext()) { 156 Entry e = (Entry) iter.next(); 157 sb.append(e.getKey().toString()).append("\t").append(e.getValue()) 158 .append("\n"); 159 } 160 return sb.toString(); 161 } 162 163 /** 164 * Initializes a new instance from a {@link JobConf}. 165 * 166 * @param job 167 * the configuration 168 */ configure(JobConf job)169 public void configure(JobConf job) { 170 this.longCounters = new TreeMap<Object, Long>(); 171 this.doubleCounters = new TreeMap<Object, Double>(); 172 } 173 } 174