1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.metrics2.impl;
20 
21 import java.util.HashMap;
22 import javax.management.Attribute;
23 import javax.management.AttributeList;
24 import javax.management.AttributeNotFoundException;
25 import javax.management.DynamicMBean;
26 import javax.management.InvalidAttributeValueException;
27 import javax.management.MBeanException;
28 import javax.management.MBeanInfo;
29 import javax.management.ObjectName;
30 import javax.management.ReflectionException;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.metrics2.Metric;
34 import org.apache.hadoop.metrics2.MetricsFilter;
35 import org.apache.hadoop.metrics2.MetricsSource;
36 import org.apache.hadoop.metrics2.MetricsTag;
37 import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
38 import org.apache.hadoop.metrics2.util.MBeans;
39 import org.apache.hadoop.metrics2.util.Contracts;
40 
41 /**
42  * An adapter class for metrics source and associated filter and jmx impl
43  */
44 class MetricsSourceAdapter implements DynamicMBean {
45 
46   private static final Log LOG = LogFactory.getLog(MetricsSourceAdapter.class);
47 
48   private final String prefix, name;
49   private final MetricsSource source;
50   private final MetricsFilter recordFilter, metricFilter;
51   private final HashMap<String, Attribute> attrCache;
52   private final MBeanInfoBuilder infoBuilder;
53   private final Iterable<MetricsTag> injectedTags;
54 
55   private Iterable<MetricsRecordImpl> lastRecs;
56   private long jmxCacheTS;
57   private int jmxCacheTTL;
58   private MBeanInfo infoCache;
59   private ObjectName mbeanName;
60 
MetricsSourceAdapter(String prefix, String name, String description, MetricsSource source, Iterable<MetricsTag> injectedTags, MetricsFilter recordFilter, MetricsFilter metricFilter, int jmxCacheTTL)61   MetricsSourceAdapter(String prefix, String name, String description,
62                        MetricsSource source, Iterable<MetricsTag> injectedTags,
63                        MetricsFilter recordFilter, MetricsFilter metricFilter,
64                        int jmxCacheTTL) {
65     this.prefix = Contracts.checkNotNull(prefix, "prefix");
66     this.name = Contracts.checkNotNull(name, "name");
67     this.source = Contracts.checkNotNull(source, "source");
68     attrCache = new HashMap<String, Attribute>();
69     infoBuilder = new MBeanInfoBuilder(name, description);
70     this.injectedTags = injectedTags;
71     this.recordFilter = recordFilter;
72     this.metricFilter = metricFilter;
73     this.jmxCacheTTL = Contracts.checkArg(jmxCacheTTL, jmxCacheTTL > 0,
74                                           "jmxCacheTTL");
75   }
76 
MetricsSourceAdapter(String prefix, String name, String description, MetricsSource source, Iterable<MetricsTag> injectedTags, int period, MetricsConfig conf)77   MetricsSourceAdapter(String prefix, String name, String description,
78                        MetricsSource source, Iterable<MetricsTag> injectedTags,
79                        int period, MetricsConfig conf) {
80     this(prefix, name, description, source, injectedTags,
81         conf.getFilter(RECORD_FILTER_KEY),
82         conf.getFilter(METRIC_FILTER_KEY), period);
83   }
84 
start()85   void start() {
86     if (mbeanName != null) {
87       LOG.warn("MBean Source "+ name +" already initialized!");
88     }
89     mbeanName = MBeans.register(prefix, name, this);
90     LOG.info("MBean for source "+ name +" registered.");
91     LOG.debug("Stacktrace: "+ new Throwable());
92   }
93 
94   @Override
getAttribute(String attribute)95   public Object getAttribute(String attribute)
96       throws AttributeNotFoundException, MBeanException, ReflectionException {
97     updateJmxCache();
98     synchronized(this) {
99       Attribute a = attrCache.get(attribute);
100       if (a == null) {
101         throw new AttributeNotFoundException(attribute +" not found");
102       }
103       if (LOG.isDebugEnabled()) {
104         LOG.debug(attribute +": "+ a.getName() +"="+ a.getValue());
105       }
106       return a.getValue();
107     }
108   }
109 
setAttribute(Attribute attribute)110   public void setAttribute(Attribute attribute)
111       throws AttributeNotFoundException, InvalidAttributeValueException,
112              MBeanException, ReflectionException {
113     throw new UnsupportedOperationException("Metrics are read-only.");
114   }
115 
116   @Override
getAttributes(String[] attributes)117   public AttributeList getAttributes(String[] attributes) {
118     updateJmxCache();
119     synchronized(this) {
120       AttributeList ret = new AttributeList();
121       for (String key : attributes) {
122         Attribute attr = attrCache.get(key);
123         if (LOG.isDebugEnabled()) {
124           LOG.debug(key +": "+ attr.getName() +"="+ attr.getValue());
125         }
126         ret.add(attr);
127       }
128       return ret;
129     }
130   }
131 
132   @Override
setAttributes(AttributeList attributes)133   public AttributeList setAttributes(AttributeList attributes) {
134     throw new UnsupportedOperationException("Metrics are read-only.");
135   }
136 
137   @Override
invoke(String actionName, Object[] params, String[] signature)138   public Object invoke(String actionName, Object[] params, String[] signature)
139       throws MBeanException, ReflectionException {
140     throw new UnsupportedOperationException("Not supported yet.");
141   }
142 
143   @Override
getMBeanInfo()144   public MBeanInfo getMBeanInfo() {
145     updateJmxCache();
146     return infoCache;
147   }
148 
updateJmxCache()149   private void updateJmxCache() {
150     boolean getAllMetrics = false;
151     synchronized(this) {
152       if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
153         // temporarilly advance the expiry while updating the cache
154         jmxCacheTS = System.currentTimeMillis() + jmxCacheTTL;
155         if (lastRecs == null) {
156           getAllMetrics = true;
157         }
158       }
159       else {
160         return;
161       }
162     }
163 
164     if (getAllMetrics) {
165       MetricsBuilderImpl builder = new MetricsBuilderImpl();
166       getMetrics(builder, true);
167     }
168 
169     synchronized(this) {
170       int cacheSize = attrCache.size(); // because updateAttrCache changes it!
171       int numMetrics = updateAttrCache();
172       if (cacheSize < numMetrics) {
173         updateInfoCache();
174       }
175       jmxCacheTS = System.currentTimeMillis();
176       lastRecs = null;
177     }
178   }
179 
getMetrics(MetricsBuilderImpl builder, boolean all)180   Iterable<MetricsRecordImpl> getMetrics(MetricsBuilderImpl builder,
181                                          boolean all) {
182     builder.setRecordFilter(recordFilter).setMetricFilter(metricFilter);
183     synchronized(this) {
184       if (lastRecs == null) {
185         all = true; // Get all the metrics to populate the sink caches
186       }
187     }
188     source.getMetrics(builder, all);
189     for (MetricsRecordBuilderImpl rb : builder) {
190       for (MetricsTag t : injectedTags) {
191         rb.add(t);
192       }
193     }
194     synchronized(this) {
195       lastRecs = builder.getRecords();
196       return lastRecs;
197     }
198   }
199 
stop()200   synchronized void stop() {
201     MBeans.unregister(mbeanName);
202     mbeanName = null;
203   }
204 
refreshMBean()205   synchronized void refreshMBean() {
206     MBeans.unregister(mbeanName);
207     mbeanName = MBeans.register(prefix, name, this);
208   }
209 
updateInfoCache()210   private void updateInfoCache() {
211     LOG.debug("Updating info cache...");
212     infoCache = infoBuilder.reset(lastRecs).get();
213     LOG.debug("Done");
214   }
215 
updateAttrCache()216   private int updateAttrCache() {
217     LOG.debug("Updating attr cache...");
218     int recNo = 0;
219     int numMetrics = 0;
220     for (MetricsRecordImpl record : lastRecs) {
221       for (MetricsTag t : record.tags()) {
222         setAttrCacheTag(t, recNo);
223         ++numMetrics;
224       }
225       for (Metric m : record.metrics()) {
226         setAttrCacheMetric(m, recNo);
227         ++numMetrics;
228       }
229       ++recNo;
230     }
231     LOG.debug("Done. numMetrics="+ numMetrics);
232     return numMetrics;
233   }
234 
tagName(String name, int recNo)235   private static String tagName(String name, int recNo) {
236     StringBuilder sb = new StringBuilder(name.length() + 16);
237     sb.append("tag.").append(name);
238     if (recNo > 0) {
239       sb.append('.').append(recNo);
240     }
241     return sb.toString();
242   }
243 
setAttrCacheTag(MetricsTag tag, int recNo)244   private void setAttrCacheTag(MetricsTag tag, int recNo) {
245     String key = tagName(tag.name(), recNo);
246     attrCache.put(key, new Attribute(key, tag.value()));
247   }
248 
metricName(String name, int recNo)249   private static String metricName(String name, int recNo) {
250     if (recNo == 0) {
251       return name;
252     }
253     StringBuilder sb = new StringBuilder(name.length() + 12);
254     sb.append(name);
255     if (recNo > 0) {
256       sb.append('.').append(recNo);
257     }
258     return sb.toString();
259   }
260 
setAttrCacheMetric(Metric metric, int recNo)261   private void setAttrCacheMetric(Metric metric, int recNo) {
262     String key = metricName(metric.name(), recNo);
263     attrCache.put(key, new Attribute(key, metric.value()));
264   }
265 
name()266   String name() {
267     return name;
268   }
269 
source()270   MetricsSource source() {
271     return source;
272   }
273 
274 }
275