1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.metrics2.impl; 20 21 import java.util.HashMap; 22 import javax.management.Attribute; 23 import javax.management.AttributeList; 24 import javax.management.AttributeNotFoundException; 25 import javax.management.DynamicMBean; 26 import javax.management.InvalidAttributeValueException; 27 import javax.management.MBeanException; 28 import javax.management.MBeanInfo; 29 import javax.management.ObjectName; 30 import javax.management.ReflectionException; 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.metrics2.Metric; 34 import org.apache.hadoop.metrics2.MetricsFilter; 35 import org.apache.hadoop.metrics2.MetricsSource; 36 import org.apache.hadoop.metrics2.MetricsTag; 37 import static org.apache.hadoop.metrics2.impl.MetricsConfig.*; 38 import org.apache.hadoop.metrics2.util.MBeans; 39 import org.apache.hadoop.metrics2.util.Contracts; 40 41 /** 42 * An adapter class for metrics source and associated filter and jmx impl 43 */ 44 class MetricsSourceAdapter implements DynamicMBean { 45 46 private static final Log LOG = LogFactory.getLog(MetricsSourceAdapter.class); 47 48 private final String prefix, name; 49 private final MetricsSource source; 50 private final MetricsFilter recordFilter, metricFilter; 51 private final HashMap<String, Attribute> attrCache; 52 private final MBeanInfoBuilder infoBuilder; 53 private final Iterable<MetricsTag> injectedTags; 54 55 private Iterable<MetricsRecordImpl> lastRecs; 56 private long jmxCacheTS; 57 private int jmxCacheTTL; 58 private MBeanInfo infoCache; 59 private ObjectName mbeanName; 60 MetricsSourceAdapter(String prefix, String name, String description, MetricsSource source, Iterable<MetricsTag> injectedTags, MetricsFilter recordFilter, MetricsFilter metricFilter, int jmxCacheTTL)61 MetricsSourceAdapter(String prefix, String name, String description, 62 MetricsSource source, Iterable<MetricsTag> injectedTags, 63 MetricsFilter recordFilter, MetricsFilter metricFilter, 64 int jmxCacheTTL) { 65 this.prefix = Contracts.checkNotNull(prefix, "prefix"); 66 this.name = Contracts.checkNotNull(name, "name"); 67 this.source = Contracts.checkNotNull(source, "source"); 68 attrCache = new HashMap<String, Attribute>(); 69 infoBuilder = new MBeanInfoBuilder(name, description); 70 this.injectedTags = injectedTags; 71 this.recordFilter = recordFilter; 72 this.metricFilter = metricFilter; 73 this.jmxCacheTTL = Contracts.checkArg(jmxCacheTTL, jmxCacheTTL > 0, 74 "jmxCacheTTL"); 75 } 76 MetricsSourceAdapter(String prefix, String name, String description, MetricsSource source, Iterable<MetricsTag> injectedTags, int period, MetricsConfig conf)77 MetricsSourceAdapter(String prefix, String name, String description, 78 MetricsSource source, Iterable<MetricsTag> injectedTags, 79 int period, MetricsConfig conf) { 80 this(prefix, name, description, source, injectedTags, 81 conf.getFilter(RECORD_FILTER_KEY), 82 conf.getFilter(METRIC_FILTER_KEY), period); 83 } 84 start()85 void start() { 86 if (mbeanName != null) { 87 LOG.warn("MBean Source "+ name +" already initialized!"); 88 } 89 mbeanName = MBeans.register(prefix, name, this); 90 LOG.info("MBean for source "+ name +" registered."); 91 LOG.debug("Stacktrace: "+ new Throwable()); 92 } 93 94 @Override getAttribute(String attribute)95 public Object getAttribute(String attribute) 96 throws AttributeNotFoundException, MBeanException, ReflectionException { 97 updateJmxCache(); 98 synchronized(this) { 99 Attribute a = attrCache.get(attribute); 100 if (a == null) { 101 throw new AttributeNotFoundException(attribute +" not found"); 102 } 103 if (LOG.isDebugEnabled()) { 104 LOG.debug(attribute +": "+ a.getName() +"="+ a.getValue()); 105 } 106 return a.getValue(); 107 } 108 } 109 setAttribute(Attribute attribute)110 public void setAttribute(Attribute attribute) 111 throws AttributeNotFoundException, InvalidAttributeValueException, 112 MBeanException, ReflectionException { 113 throw new UnsupportedOperationException("Metrics are read-only."); 114 } 115 116 @Override getAttributes(String[] attributes)117 public AttributeList getAttributes(String[] attributes) { 118 updateJmxCache(); 119 synchronized(this) { 120 AttributeList ret = new AttributeList(); 121 for (String key : attributes) { 122 Attribute attr = attrCache.get(key); 123 if (LOG.isDebugEnabled()) { 124 LOG.debug(key +": "+ attr.getName() +"="+ attr.getValue()); 125 } 126 ret.add(attr); 127 } 128 return ret; 129 } 130 } 131 132 @Override setAttributes(AttributeList attributes)133 public AttributeList setAttributes(AttributeList attributes) { 134 throw new UnsupportedOperationException("Metrics are read-only."); 135 } 136 137 @Override invoke(String actionName, Object[] params, String[] signature)138 public Object invoke(String actionName, Object[] params, String[] signature) 139 throws MBeanException, ReflectionException { 140 throw new UnsupportedOperationException("Not supported yet."); 141 } 142 143 @Override getMBeanInfo()144 public MBeanInfo getMBeanInfo() { 145 updateJmxCache(); 146 return infoCache; 147 } 148 updateJmxCache()149 private void updateJmxCache() { 150 boolean getAllMetrics = false; 151 synchronized(this) { 152 if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) { 153 // temporarilly advance the expiry while updating the cache 154 jmxCacheTS = System.currentTimeMillis() + jmxCacheTTL; 155 if (lastRecs == null) { 156 getAllMetrics = true; 157 } 158 } 159 else { 160 return; 161 } 162 } 163 164 if (getAllMetrics) { 165 MetricsBuilderImpl builder = new MetricsBuilderImpl(); 166 getMetrics(builder, true); 167 } 168 169 synchronized(this) { 170 int cacheSize = attrCache.size(); // because updateAttrCache changes it! 171 int numMetrics = updateAttrCache(); 172 if (cacheSize < numMetrics) { 173 updateInfoCache(); 174 } 175 jmxCacheTS = System.currentTimeMillis(); 176 lastRecs = null; 177 } 178 } 179 getMetrics(MetricsBuilderImpl builder, boolean all)180 Iterable<MetricsRecordImpl> getMetrics(MetricsBuilderImpl builder, 181 boolean all) { 182 builder.setRecordFilter(recordFilter).setMetricFilter(metricFilter); 183 synchronized(this) { 184 if (lastRecs == null) { 185 all = true; // Get all the metrics to populate the sink caches 186 } 187 } 188 source.getMetrics(builder, all); 189 for (MetricsRecordBuilderImpl rb : builder) { 190 for (MetricsTag t : injectedTags) { 191 rb.add(t); 192 } 193 } 194 synchronized(this) { 195 lastRecs = builder.getRecords(); 196 return lastRecs; 197 } 198 } 199 stop()200 synchronized void stop() { 201 MBeans.unregister(mbeanName); 202 mbeanName = null; 203 } 204 refreshMBean()205 synchronized void refreshMBean() { 206 MBeans.unregister(mbeanName); 207 mbeanName = MBeans.register(prefix, name, this); 208 } 209 updateInfoCache()210 private void updateInfoCache() { 211 LOG.debug("Updating info cache..."); 212 infoCache = infoBuilder.reset(lastRecs).get(); 213 LOG.debug("Done"); 214 } 215 updateAttrCache()216 private int updateAttrCache() { 217 LOG.debug("Updating attr cache..."); 218 int recNo = 0; 219 int numMetrics = 0; 220 for (MetricsRecordImpl record : lastRecs) { 221 for (MetricsTag t : record.tags()) { 222 setAttrCacheTag(t, recNo); 223 ++numMetrics; 224 } 225 for (Metric m : record.metrics()) { 226 setAttrCacheMetric(m, recNo); 227 ++numMetrics; 228 } 229 ++recNo; 230 } 231 LOG.debug("Done. numMetrics="+ numMetrics); 232 return numMetrics; 233 } 234 tagName(String name, int recNo)235 private static String tagName(String name, int recNo) { 236 StringBuilder sb = new StringBuilder(name.length() + 16); 237 sb.append("tag.").append(name); 238 if (recNo > 0) { 239 sb.append('.').append(recNo); 240 } 241 return sb.toString(); 242 } 243 setAttrCacheTag(MetricsTag tag, int recNo)244 private void setAttrCacheTag(MetricsTag tag, int recNo) { 245 String key = tagName(tag.name(), recNo); 246 attrCache.put(key, new Attribute(key, tag.value())); 247 } 248 metricName(String name, int recNo)249 private static String metricName(String name, int recNo) { 250 if (recNo == 0) { 251 return name; 252 } 253 StringBuilder sb = new StringBuilder(name.length() + 12); 254 sb.append(name); 255 if (recNo > 0) { 256 sb.append('.').append(recNo); 257 } 258 return sb.toString(); 259 } 260 setAttrCacheMetric(Metric metric, int recNo)261 private void setAttrCacheMetric(Metric metric, int recNo) { 262 String key = metricName(metric.name(), recNo); 263 attrCache.put(key, new Attribute(key, metric.value())); 264 } 265 name()266 String name() { 267 return name; 268 } 269 source()270 MetricsSource source() { 271 return source; 272 } 273 274 } 275