1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.counters;
20 
21 import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId;
22 import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup;
23 
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.IOException;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.classification.InterfaceAudience;
35 import org.apache.hadoop.classification.InterfaceStability;
36 import org.apache.hadoop.io.Text;
37 import org.apache.hadoop.io.Writable;
38 import org.apache.hadoop.io.WritableUtils;
39 import org.apache.hadoop.mapreduce.Counter;
40 import org.apache.hadoop.mapreduce.FileSystemCounter;
41 import org.apache.hadoop.mapreduce.JobCounter;
42 import org.apache.hadoop.mapreduce.TaskCounter;
43 import org.apache.hadoop.util.StringInterner;
44 
45 import com.google.common.collect.Iterables;
46 import com.google.common.collect.Iterators;
47 import com.google.common.collect.Maps;
48 
49 /**
50  * An abstract class to provide common implementation for the Counters
51  * container in both mapred and mapreduce packages.
52  *
53  * @param <C> type of counter inside the counters
54  * @param <G> type of group inside the counters
55  */
56 @InterfaceAudience.Public
57 @InterfaceStability.Stable
58 public abstract class AbstractCounters<C extends Counter,
59                                        G extends CounterGroupBase<C>>
60     implements Writable, Iterable<G> {
61 
62   protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
63 
64   /**
65    * A cache from enum values to the associated counter.
66    */
67   private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
68   //framework & fs groups
69   private Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
70   // other groups
71   private Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
72   private final CounterGroupFactory<C, G> groupFactory;
73 
74   // For framework counter serialization without strings
75   enum GroupType { FRAMEWORK, FILESYSTEM };
76 
77   // Writes only framework and fs counters if false.
78   private boolean writeAllCounters = true;
79 
80   private static final Map<String, String> legacyMap = Maps.newHashMap();
81   static {
82     legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
83                   TaskCounter.class.getName());
84     legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
85                   JobCounter.class.getName());
86     legacyMap.put("FileSystemCounters", FileSystemCounter.class.getName());
87   }
88 
89   private final Limits limits = new Limits();
90 
91   @InterfaceAudience.Private
AbstractCounters(CounterGroupFactory<C, G> gf)92   public AbstractCounters(CounterGroupFactory<C, G> gf) {
93     groupFactory = gf;
94   }
95 
96   /**
97    * Construct from another counters object.
98    * @param <C1> type of the other counter
99    * @param <G1> type of the other counter group
100    * @param counters the counters object to copy
101    * @param groupFactory the factory for new groups
102    */
103   @InterfaceAudience.Private
104   public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters, CounterGroupFactory<C, G> groupFactory)105   AbstractCounters(AbstractCounters<C1, G1> counters,
106                    CounterGroupFactory<C, G> groupFactory) {
107     this.groupFactory = groupFactory;
108     for(G1 group: counters) {
109       String name = group.getName();
110       G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
111       (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
112       for(Counter counter: group) {
113         newGroup.addCounter(counter.getName(), counter.getDisplayName(),
114                             counter.getValue());
115       }
116     }
117   }
118 
119   /** Add a group.
120    * @param group object to add
121    * @return the group
122    */
123   @InterfaceAudience.Private
addGroup(G group)124   public synchronized G addGroup(G group) {
125     String name = group.getName();
126     if (isFrameworkGroup(name)) {
127       fgroups.put(name, group);
128     } else {
129       limits.checkGroups(groups.size() + 1);
130       groups.put(name, group);
131     }
132     return group;
133   }
134 
135   /**
136    * Add a new group
137    * @param name of the group
138    * @param displayName of the group
139    * @return the group
140    */
141   @InterfaceAudience.Private
addGroup(String name, String displayName)142   public G addGroup(String name, String displayName) {
143     return addGroup(groupFactory.newGroup(name, displayName, limits));
144   }
145 
146   /**
147    * Find a counter, create one if necessary
148    * @param groupName of the counter
149    * @param counterName name of the counter
150    * @return the matching counter
151    */
findCounter(String groupName, String counterName)152   public C findCounter(String groupName, String counterName) {
153     G grp = getGroup(groupName);
154     return grp.findCounter(counterName);
155   }
156 
157   /**
158    * Find the counter for the given enum. The same enum will always return the
159    * same counter.
160    * @param key the counter key
161    * @return the matching counter object
162    */
findCounter(Enum<?> key)163   public synchronized C findCounter(Enum<?> key) {
164     C counter = cache.get(key);
165     if (counter == null) {
166       counter = findCounter(key.getDeclaringClass().getName(), key.name());
167       cache.put(key, counter);
168     }
169     return counter;
170   }
171 
172   /**
173    * Find the file system counter for the given scheme and enum.
174    * @param scheme of the file system
175    * @param key the enum of the counter
176    * @return the file system counter
177    */
178   @InterfaceAudience.Private
findCounter(String scheme, FileSystemCounter key)179   public synchronized C findCounter(String scheme, FileSystemCounter key) {
180     return ((FileSystemCounterGroup<C>) getGroup(
181         FileSystemCounter.class.getName()).getUnderlyingGroup()).
182         findCounter(scheme, key);
183   }
184 
185   /**
186    * Returns the names of all counter classes.
187    * @return Set of counter names.
188    */
getGroupNames()189   public synchronized Iterable<String> getGroupNames() {
190     HashSet<String> deprecated = new HashSet<String>();
191     for(Map.Entry<String, String> entry : legacyMap.entrySet()) {
192       String newGroup = entry.getValue();
193       boolean isFGroup = isFrameworkGroup(newGroup);
194       if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) {
195         deprecated.add(entry.getKey());
196       }
197     }
198     return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated);
199   }
200 
201   @Override
iterator()202   public Iterator<G> iterator() {
203     return Iterators.concat(fgroups.values().iterator(),
204                             groups.values().iterator());
205   }
206 
207   /**
208    * Returns the named counter group, or an empty group if there is none
209    * with the specified name.
210    * @param groupName name of the group
211    * @return the group
212    */
getGroup(String groupName)213   public synchronized G getGroup(String groupName) {
214 
215     // filterGroupName
216     boolean groupNameInLegacyMap = true;
217     String newGroupName = legacyMap.get(groupName);
218     if (newGroupName == null) {
219       groupNameInLegacyMap = false;
220       newGroupName = Limits.filterGroupName(groupName);
221     }
222 
223     boolean isFGroup = isFrameworkGroup(newGroupName);
224     G group = isFGroup ? fgroups.get(newGroupName) : groups.get(newGroupName);
225     if (group == null) {
226       group = groupFactory.newGroup(newGroupName, limits);
227       if (isFGroup) {
228         fgroups.put(newGroupName, group);
229       } else {
230         limits.checkGroups(groups.size() + 1);
231         groups.put(newGroupName, group);
232       }
233       if (groupNameInLegacyMap) {
234         LOG.warn("Group " + groupName + " is deprecated. Use " + newGroupName
235             + " instead");
236       }
237     }
238     return group;
239   }
240 
241   /**
242    * Returns the total number of counters, by summing the number of counters
243    * in each group.
244    * @return the total number of counters
245    */
countCounters()246   public synchronized int countCounters() {
247     int result = 0;
248     for (G group : this) {
249       result += group.size();
250     }
251     return result;
252   }
253 
254   /**
255    * Write the set of groups.
256    * Counters ::= version #fgroups (groupId, group)* #groups (group)*
257    */
258   @Override
write(DataOutput out)259   public synchronized void write(DataOutput out) throws IOException {
260     WritableUtils.writeVInt(out, groupFactory.version());
261     WritableUtils.writeVInt(out, fgroups.size());  // framework groups first
262     for (G group : fgroups.values()) {
263       if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
264         WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
265         WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
266         group.write(out);
267       } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
268         WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
269         group.write(out);
270       }
271     }
272     if (writeAllCounters) {
273       WritableUtils.writeVInt(out, groups.size());
274       for (G group : groups.values()) {
275         Text.writeString(out, group.getName());
276         group.write(out);
277       }
278     } else {
279       WritableUtils.writeVInt(out, 0);
280     }
281   }
282 
283   @Override
readFields(DataInput in)284   public synchronized void readFields(DataInput in) throws IOException {
285     int version = WritableUtils.readVInt(in);
286     if (version != groupFactory.version()) {
287       throw new IOException("Counters version mismatch, expected "+
288           groupFactory.version() +" got "+ version);
289     }
290     int numFGroups = WritableUtils.readVInt(in);
291     fgroups.clear();
292     GroupType[] groupTypes = GroupType.values();
293     while (numFGroups-- > 0) {
294       GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
295       G group;
296       switch (groupType) {
297         case FILESYSTEM: // with nothing
298           group = groupFactory.newFileSystemGroup();
299           break;
300         case FRAMEWORK:  // with group id
301           group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
302           break;
303         default: // Silence dumb compiler, as it would've thrown earlier
304           throw new IOException("Unexpected counter group type: "+ groupType);
305       }
306       group.readFields(in);
307       fgroups.put(group.getName(), group);
308     }
309     int numGroups = WritableUtils.readVInt(in);
310     while (numGroups-- > 0) {
311       limits.checkGroups(groups.size() + 1);
312       G group = groupFactory.newGenericGroup(
313           StringInterner.weakIntern(Text.readString(in)), null, limits);
314       group.readFields(in);
315       groups.put(group.getName(), group);
316     }
317   }
318 
319   /**
320    * Return textual representation of the counter values.
321    * @return the string
322    */
323   @Override
toString()324   public synchronized String toString() {
325     StringBuilder sb = new StringBuilder("Counters: " + countCounters());
326     for (G group: this) {
327       sb.append("\n\t").append(group.getDisplayName());
328       for (Counter counter: group) {
329         sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
330           .append(counter.getValue());
331       }
332     }
333     return sb.toString();
334   }
335 
336   /**
337    * Increments multiple counters by their amounts in another Counters
338    * instance.
339    * @param other the other Counters instance
340    */
incrAllCounters(AbstractCounters<C, G> other)341   public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
342     for(G right : other) {
343       String groupName = right.getName();
344       G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
345       if (left == null) {
346         left = addGroup(groupName, right.getDisplayName());
347       }
348       left.incrAllCounters(right);
349     }
350   }
351 
352   @Override
353   @SuppressWarnings("unchecked")
equals(Object genericRight)354   public boolean equals(Object genericRight) {
355     if (genericRight instanceof AbstractCounters<?, ?>) {
356       return Iterators.elementsEqual(iterator(),
357           ((AbstractCounters<C, G>)genericRight).iterator());
358     }
359     return false;
360   }
361 
362   @Override
hashCode()363   public int hashCode() {
364     return groups.hashCode();
365   }
366 
367   /**
368    * Set the "writeAllCounters" option to true or false
369    * @param send  if true all counters would be serialized, otherwise only
370    *              framework counters would be serialized in
371    *              {@link #write(DataOutput)}
372    */
373   @InterfaceAudience.Private
setWriteAllCounters(boolean send)374   public void setWriteAllCounters(boolean send) {
375     writeAllCounters = send;
376   }
377 
378   /**
379    * Get the "writeAllCounters" option
380    * @return true of all counters would serialized
381    */
382   @InterfaceAudience.Private
getWriteAllCounters()383   public boolean getWriteAllCounters() {
384     return writeAllCounters;
385   }
386 
387   @InterfaceAudience.Private
limits()388   public Limits limits() {
389     return limits;
390   }
391 }
392