1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.contrib.utils.join;
20 
21 import java.util.SortedMap;
22 import java.util.TreeMap;
23 import java.util.Map.Entry;
24 import java.util.Iterator;
25 
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.mapred.JobConf;
29 import org.apache.hadoop.mapred.Mapper;
30 import org.apache.hadoop.mapred.Reducer;
31 
32 /**
33  * A common base implementing some statics collecting mechanisms that are
34  * commonly used in a typical map/reduce job.
35  *
36  */
37 public abstract class JobBase implements Mapper, Reducer {
38 
39   public static final Log LOG = LogFactory.getLog("datajoin.job");
40 
41   private SortedMap<Object, Long> longCounters = null;
42 
43   private SortedMap<Object, Double> doubleCounters = null;
44 
45   /**
46    * Set the given counter to the given value
47    *
48    * @param name
49    *          the counter name
50    * @param value
51    *          the value for the counter
52    */
setLongValue(Object name, long value)53   protected void setLongValue(Object name, long value) {
54     this.longCounters.put(name, Long.valueOf(value));
55   }
56 
57   /**
58    * Set the given counter to the given value
59    *
60    * @param name
61    *          the counter name
62    * @param value
63    *          the value for the counter
64    */
setDoubleValue(Object name, double value)65   protected void setDoubleValue(Object name, double value) {
66     this.doubleCounters.put(name, new Double(value));
67   }
68 
69   /**
70    *
71    * @param name
72    *          the counter name
73    * @return return the value of the given counter.
74    */
getLongValue(Object name)75   protected Long getLongValue(Object name) {
76     return this.longCounters.get(name);
77   }
78 
79   /**
80    *
81    * @param name
82    *          the counter name
83    * @return return the value of the given counter.
84    */
getDoubleValue(Object name)85   protected Double getDoubleValue(Object name) {
86     return this.doubleCounters.get(name);
87   }
88 
89   /**
90    * Increment the given counter by the given incremental value If the counter
91    * does not exist, one is created with value 0.
92    *
93    * @param name
94    *          the counter name
95    * @param inc
96    *          the incremental value
97    * @return the updated value.
98    */
addLongValue(Object name, long inc)99   protected Long addLongValue(Object name, long inc) {
100     Long val = this.longCounters.get(name);
101     Long retv = null;
102     if (val == null) {
103       retv = Long.valueOf(inc);
104     } else {
105       retv = Long.valueOf(val.longValue() + inc);
106     }
107     this.longCounters.put(name, retv);
108     return retv;
109   }
110 
111   /**
112    * Increment the given counter by the given incremental value If the counter
113    * does not exist, one is created with value 0.
114    *
115    * @param name
116    *          the counter name
117    * @param inc
118    *          the incremental value
119    * @return the updated value.
120    */
addDoubleValue(Object name, double inc)121   protected Double addDoubleValue(Object name, double inc) {
122     Double val = this.doubleCounters.get(name);
123     Double retv = null;
124     if (val == null) {
125       retv = new Double(inc);
126     } else {
127       retv = new Double(val.doubleValue() + inc);
128     }
129     this.doubleCounters.put(name, retv);
130     return retv;
131   }
132 
133   /**
134    * log the counters
135    *
136    */
report()137   protected void report() {
138     LOG.info(getReport());
139   }
140 
141   /**
142    * log the counters
143    *
144    */
getReport()145   protected String getReport() {
146     StringBuffer sb = new StringBuffer();
147 
148     Iterator iter = this.longCounters.entrySet().iterator();
149     while (iter.hasNext()) {
150       Entry e = (Entry) iter.next();
151       sb.append(e.getKey().toString()).append("\t").append(e.getValue())
152         .append("\n");
153     }
154     iter = this.doubleCounters.entrySet().iterator();
155     while (iter.hasNext()) {
156       Entry e = (Entry) iter.next();
157       sb.append(e.getKey().toString()).append("\t").append(e.getValue())
158         .append("\n");
159     }
160     return sb.toString();
161   }
162 
163   /**
164    * Initializes a new instance from a {@link JobConf}.
165    *
166    * @param job
167    *          the configuration
168    */
configure(JobConf job)169   public void configure(JobConf job) {
170     this.longCounters = new TreeMap<Object, Long>();
171     this.doubleCounters = new TreeMap<Object, Double>();
172   }
173 }
174