1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.counters; 20 21 import java.io.DataInput; 22 import java.io.DataOutput; 23 import java.io.IOException; 24 import java.util.Iterator; 25 import java.util.concurrent.ConcurrentMap; 26 import java.util.concurrent.ConcurrentSkipListMap; 27 28 import org.apache.hadoop.classification.InterfaceAudience; 29 import org.apache.hadoop.io.Text; 30 import org.apache.hadoop.io.WritableUtils; 31 import org.apache.hadoop.mapreduce.Counter; 32 import org.apache.hadoop.mapreduce.util.ResourceBundles; 33 import org.apache.hadoop.util.StringInterner; 34 35 import com.google.common.collect.Iterators; 36 37 /** 38 * An abstract class to provide common implementation of the 39 * generic counter group in both mapred and mapreduce package. 40 * 41 * @param <T> type of the counter for the group 42 */ 43 @InterfaceAudience.Private 44 public abstract class AbstractCounterGroup<T extends Counter> 45 implements CounterGroupBase<T> { 46 47 private final String name; 48 private String displayName; 49 private final ConcurrentMap<String, T> counters = 50 new ConcurrentSkipListMap<String, T>(); 51 private final Limits limits; 52 AbstractCounterGroup(String name, String displayName, Limits limits)53 public AbstractCounterGroup(String name, String displayName, 54 Limits limits) { 55 this.name = name; 56 this.displayName = displayName; 57 this.limits = limits; 58 } 59 60 @Override getName()61 public String getName() { 62 return name; 63 } 64 65 @Override getDisplayName()66 public synchronized String getDisplayName() { 67 return displayName; 68 } 69 70 @Override setDisplayName(String displayName)71 public synchronized void setDisplayName(String displayName) { 72 this.displayName = displayName; 73 } 74 75 @Override addCounter(T counter)76 public synchronized void addCounter(T counter) { 77 counters.put(counter.getName(), counter); 78 limits.incrCounters(); 79 } 80 81 @Override addCounter(String counterName, String displayName, long value)82 public synchronized T addCounter(String counterName, String displayName, 83 long value) { 84 String saveName = Limits.filterCounterName(counterName); 85 T counter = findCounterImpl(saveName, false); 86 if (counter == null) { 87 return addCounterImpl(saveName, displayName, value); 88 } 89 counter.setValue(value); 90 return counter; 91 } 92 addCounterImpl(String name, String displayName, long value)93 private T addCounterImpl(String name, String displayName, long value) { 94 T counter = newCounter(name, displayName, value); 95 addCounter(counter); 96 return counter; 97 } 98 99 @Override findCounter(String counterName, String displayName)100 public synchronized T findCounter(String counterName, String displayName) { 101 // Take lock to avoid two threads not finding a counter and trying to add 102 // the same counter. 103 String saveName = Limits.filterCounterName(counterName); 104 T counter = findCounterImpl(saveName, false); 105 if (counter == null) { 106 return addCounterImpl(saveName, displayName, 0); 107 } 108 return counter; 109 } 110 111 @Override findCounter(String counterName, boolean create)112 public T findCounter(String counterName, boolean create) { 113 return findCounterImpl(Limits.filterCounterName(counterName), create); 114 } 115 116 // Lock the object. Cannot simply use concurrent constructs on the counters 117 // data-structure (like putIfAbsent) because of localization, limits etc. findCounterImpl(String counterName, boolean create)118 private synchronized T findCounterImpl(String counterName, boolean create) { 119 T counter = counters.get(counterName); 120 if (counter == null && create) { 121 String localized = 122 ResourceBundles.getCounterName(getName(), counterName, counterName); 123 return addCounterImpl(counterName, localized, 0); 124 } 125 return counter; 126 } 127 128 @Override findCounter(String counterName)129 public T findCounter(String counterName) { 130 return findCounter(counterName, true); 131 } 132 133 /** 134 * Abstract factory method to create a new counter of type T 135 * @param counterName of the counter 136 * @param displayName of the counter 137 * @param value of the counter 138 * @return a new counter 139 */ newCounter(String counterName, String displayName, long value)140 protected abstract T newCounter(String counterName, String displayName, 141 long value); 142 143 /** 144 * Abstract factory method to create a new counter of type T 145 * @return a new counter object 146 */ newCounter()147 protected abstract T newCounter(); 148 149 @Override iterator()150 public Iterator<T> iterator() { 151 return counters.values().iterator(); 152 } 153 154 /** 155 * GenericGroup ::= displayName #counter counter* 156 */ 157 @Override write(DataOutput out)158 public synchronized void write(DataOutput out) throws IOException { 159 Text.writeString(out, displayName); 160 WritableUtils.writeVInt(out, counters.size()); 161 for(Counter counter: counters.values()) { 162 counter.write(out); 163 } 164 } 165 166 @Override readFields(DataInput in)167 public synchronized void readFields(DataInput in) throws IOException { 168 displayName = StringInterner.weakIntern(Text.readString(in)); 169 counters.clear(); 170 int size = WritableUtils.readVInt(in); 171 for (int i = 0; i < size; i++) { 172 T counter = newCounter(); 173 counter.readFields(in); 174 counters.put(counter.getName(), counter); 175 limits.incrCounters(); 176 } 177 } 178 179 @Override size()180 public synchronized int size() { 181 return counters.size(); 182 } 183 184 @Override equals(Object genericRight)185 public synchronized boolean equals(Object genericRight) { 186 if (genericRight instanceof CounterGroupBase<?>) { 187 @SuppressWarnings("unchecked") 188 CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight; 189 return Iterators.elementsEqual(iterator(), right.iterator()); 190 } 191 return false; 192 } 193 194 @Override hashCode()195 public synchronized int hashCode() { 196 return counters.hashCode(); 197 } 198 199 @Override incrAllCounters(CounterGroupBase<T> rightGroup)200 public void incrAllCounters(CounterGroupBase<T> rightGroup) { 201 try { 202 for (Counter right : rightGroup) { 203 Counter left = findCounter(right.getName(), right.getDisplayName()); 204 left.increment(right.getValue()); 205 } 206 } catch (LimitExceededException e) { 207 counters.clear(); 208 throw e; 209 } 210 } 211 } 212