1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.counters;
20 
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.Iterator;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ConcurrentSkipListMap;
27 
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.io.Text;
30 import org.apache.hadoop.io.WritableUtils;
31 import org.apache.hadoop.mapreduce.Counter;
32 import org.apache.hadoop.mapreduce.util.ResourceBundles;
33 import org.apache.hadoop.util.StringInterner;
34 
35 import com.google.common.collect.Iterators;
36 
37 /**
38  * An abstract class to provide common implementation of the
39  * generic counter group in both mapred and mapreduce package.
40  *
41  * @param <T> type of the counter for the group
42  */
43 @InterfaceAudience.Private
44 public abstract class AbstractCounterGroup<T extends Counter>
45     implements CounterGroupBase<T> {
46 
47   private final String name;
48   private String displayName;
49   private final ConcurrentMap<String, T> counters =
50       new ConcurrentSkipListMap<String, T>();
51   private final Limits limits;
52 
AbstractCounterGroup(String name, String displayName, Limits limits)53   public AbstractCounterGroup(String name, String displayName,
54                               Limits limits) {
55     this.name = name;
56     this.displayName = displayName;
57     this.limits = limits;
58   }
59 
60   @Override
getName()61   public String getName() {
62     return name;
63   }
64 
65   @Override
getDisplayName()66   public synchronized String getDisplayName() {
67     return displayName;
68   }
69 
70   @Override
setDisplayName(String displayName)71   public synchronized void setDisplayName(String displayName) {
72     this.displayName = displayName;
73   }
74 
75   @Override
addCounter(T counter)76   public synchronized void addCounter(T counter) {
77     counters.put(counter.getName(), counter);
78     limits.incrCounters();
79   }
80 
81   @Override
addCounter(String counterName, String displayName, long value)82   public synchronized T addCounter(String counterName, String displayName,
83                                    long value) {
84     String saveName = Limits.filterCounterName(counterName);
85     T counter = findCounterImpl(saveName, false);
86     if (counter == null) {
87       return addCounterImpl(saveName, displayName, value);
88     }
89     counter.setValue(value);
90     return counter;
91   }
92 
addCounterImpl(String name, String displayName, long value)93   private T addCounterImpl(String name, String displayName, long value) {
94     T counter = newCounter(name, displayName, value);
95     addCounter(counter);
96     return counter;
97   }
98 
99   @Override
findCounter(String counterName, String displayName)100   public synchronized T findCounter(String counterName, String displayName) {
101     // Take lock to avoid two threads not finding a counter and trying to add
102     // the same counter.
103     String saveName = Limits.filterCounterName(counterName);
104     T counter = findCounterImpl(saveName, false);
105     if (counter == null) {
106       return addCounterImpl(saveName, displayName, 0);
107     }
108     return counter;
109   }
110 
111   @Override
findCounter(String counterName, boolean create)112   public T findCounter(String counterName, boolean create) {
113     return findCounterImpl(Limits.filterCounterName(counterName), create);
114   }
115 
116   // Lock the object. Cannot simply use concurrent constructs on the counters
117   // data-structure (like putIfAbsent) because of localization, limits etc.
findCounterImpl(String counterName, boolean create)118   private synchronized T findCounterImpl(String counterName, boolean create) {
119     T counter = counters.get(counterName);
120     if (counter == null && create) {
121       String localized =
122           ResourceBundles.getCounterName(getName(), counterName, counterName);
123       return addCounterImpl(counterName, localized, 0);
124     }
125     return counter;
126   }
127 
128   @Override
findCounter(String counterName)129   public T findCounter(String counterName) {
130     return findCounter(counterName, true);
131   }
132 
133   /**
134    * Abstract factory method to create a new counter of type T
135    * @param counterName of the counter
136    * @param displayName of the counter
137    * @param value of the counter
138    * @return a new counter
139    */
newCounter(String counterName, String displayName, long value)140   protected abstract T newCounter(String counterName, String displayName,
141                                   long value);
142 
143   /**
144    * Abstract factory method to create a new counter of type T
145    * @return a new counter object
146    */
newCounter()147   protected abstract T newCounter();
148 
149   @Override
iterator()150   public Iterator<T> iterator() {
151     return counters.values().iterator();
152   }
153 
154   /**
155    * GenericGroup ::= displayName #counter counter*
156    */
157   @Override
write(DataOutput out)158   public synchronized void write(DataOutput out) throws IOException {
159     Text.writeString(out, displayName);
160     WritableUtils.writeVInt(out, counters.size());
161     for(Counter counter: counters.values()) {
162       counter.write(out);
163     }
164   }
165 
166   @Override
readFields(DataInput in)167   public synchronized void readFields(DataInput in) throws IOException {
168     displayName = StringInterner.weakIntern(Text.readString(in));
169     counters.clear();
170     int size = WritableUtils.readVInt(in);
171     for (int i = 0; i < size; i++) {
172       T counter = newCounter();
173       counter.readFields(in);
174       counters.put(counter.getName(), counter);
175       limits.incrCounters();
176     }
177   }
178 
179   @Override
size()180   public synchronized int size() {
181     return counters.size();
182   }
183 
184   @Override
equals(Object genericRight)185   public synchronized boolean equals(Object genericRight) {
186     if (genericRight instanceof CounterGroupBase<?>) {
187       @SuppressWarnings("unchecked")
188       CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
189       return Iterators.elementsEqual(iterator(), right.iterator());
190     }
191     return false;
192   }
193 
194   @Override
hashCode()195   public synchronized int hashCode() {
196     return counters.hashCode();
197   }
198 
199   @Override
incrAllCounters(CounterGroupBase<T> rightGroup)200   public void incrAllCounters(CounterGroupBase<T> rightGroup) {
201     try {
202       for (Counter right : rightGroup) {
203         Counter left = findCounter(right.getName(), right.getDisplayName());
204         left.increment(right.getValue());
205       }
206     } catch (LimitExceededException e) {
207       counters.clear();
208       throw e;
209     }
210   }
211 }
212