1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.client; 19 20 import com.google.common.annotations.VisibleForTesting; 21 import com.google.protobuf.Descriptors.MethodDescriptor; 22 import com.google.protobuf.Message; 23 import com.yammer.metrics.core.Counter; 24 import com.yammer.metrics.core.Histogram; 25 import com.yammer.metrics.core.MetricsRegistry; 26 import com.yammer.metrics.core.Timer; 27 import com.yammer.metrics.reporting.JmxReporter; 28 import com.yammer.metrics.util.RatioGauge; 29 import org.apache.hadoop.hbase.ServerName; 30 import org.apache.hadoop.hbase.classification.InterfaceAudience; 31 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 32 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; 33 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; 34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; 35 import org.apache.hadoop.hbase.util.Bytes; 36 37 import java.util.concurrent.ConcurrentHashMap; 38 import java.util.concurrent.ConcurrentSkipListMap; 39 import java.util.concurrent.ConcurrentMap; 40 import java.util.concurrent.ThreadPoolExecutor; 41 import java.util.concurrent.TimeUnit; 42 43 /** 44 * This class is for maintaining the various connection statistics and publishing them through 45 * the metrics interfaces. 46 * 47 * This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not 48 * conflict with other uses of Yammer Metrics within the client application. Instantiating 49 * this class implicitly creates and "starts" instances of these classes; be sure to call 50 * {@link #shutdown()} to terminate the thread pools they allocate. 51 */ 52 @InterfaceAudience.Private 53 public class MetricsConnection { 54 55 /** Set this key to {@code true} to enable metrics collection of client requests. */ 56 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 57 58 private static final String DRTN_BASE = "rpcCallDurationMs_"; 59 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 60 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 61 private static final String MEMLOAD_BASE = "memstoreLoad_"; 62 private static final String HEAP_BASE = "heapOccupancy_"; 63 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 64 65 /** A container class for collecting details about the RPC call as it percolates. */ 66 public static class CallStats { 67 private long requestSizeBytes = 0; 68 private long responseSizeBytes = 0; 69 private long startTime = 0; 70 private long callTimeMs = 0; 71 getRequestSizeBytes()72 public long getRequestSizeBytes() { 73 return requestSizeBytes; 74 } 75 setRequestSizeBytes(long requestSizeBytes)76 public void setRequestSizeBytes(long requestSizeBytes) { 77 this.requestSizeBytes = requestSizeBytes; 78 } 79 getResponseSizeBytes()80 public long getResponseSizeBytes() { 81 return responseSizeBytes; 82 } 83 setResponseSizeBytes(long responseSizeBytes)84 public void setResponseSizeBytes(long responseSizeBytes) { 85 this.responseSizeBytes = responseSizeBytes; 86 } 87 getStartTime()88 public long getStartTime() { 89 return startTime; 90 } 91 setStartTime(long startTime)92 public void setStartTime(long startTime) { 93 this.startTime = startTime; 94 } 95 getCallTimeMs()96 public long getCallTimeMs() { 97 return callTimeMs; 98 } 99 setCallTimeMs(long callTimeMs)100 public void setCallTimeMs(long callTimeMs) { 101 this.callTimeMs = callTimeMs; 102 } 103 } 104 105 @VisibleForTesting 106 protected static final class CallTracker { 107 private final String name; 108 @VisibleForTesting final Timer callTimer; 109 @VisibleForTesting final Histogram reqHist; 110 @VisibleForTesting final Histogram respHist; 111 CallTracker(MetricsRegistry registry, String name, String subName, String scope)112 private CallTracker(MetricsRegistry registry, String name, String subName, String scope) { 113 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 114 if (subName != null) { 115 sb.append("(").append(subName).append(")"); 116 } 117 this.name = sb.toString(); 118 this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope); 119 this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope); 120 this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope); 121 } 122 CallTracker(MetricsRegistry registry, String name, String scope)123 private CallTracker(MetricsRegistry registry, String name, String scope) { 124 this(registry, name, null, scope); 125 } 126 updateRpc(CallStats stats)127 public void updateRpc(CallStats stats) { 128 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 129 this.reqHist.update(stats.getRequestSizeBytes()); 130 this.respHist.update(stats.getResponseSizeBytes()); 131 } 132 133 @Override toString()134 public String toString() { 135 return "CallTracker:" + name; 136 } 137 } 138 139 protected static class RegionStats { 140 final String name; 141 final Histogram memstoreLoadHist; 142 final Histogram heapOccupancyHist; 143 RegionStats(MetricsRegistry registry, String name)144 public RegionStats(MetricsRegistry registry, String name) { 145 this.name = name; 146 this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class, 147 MEMLOAD_BASE + this.name); 148 this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class, 149 HEAP_BASE + this.name); 150 } 151 update(ClientProtos.RegionLoadStats regionStatistics)152 public void update(ClientProtos.RegionLoadStats regionStatistics) { 153 this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad()); 154 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 155 } 156 } 157 158 @VisibleForTesting 159 protected static class RunnerStats { 160 final Counter normalRunners; 161 final Counter delayRunners; 162 final Histogram delayIntevalHist; 163 RunnerStats(MetricsRegistry registry)164 public RunnerStats(MetricsRegistry registry) { 165 this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount"); 166 this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount"); 167 this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist"); 168 } 169 incrNormalRunners()170 public void incrNormalRunners() { 171 this.normalRunners.inc(); 172 } 173 incrDelayRunners()174 public void incrDelayRunners() { 175 this.delayRunners.inc(); 176 } 177 updateDelayInterval(long interval)178 public void updateDelayInterval(long interval) { 179 this.delayIntevalHist.update(interval); 180 } 181 } 182 183 @VisibleForTesting 184 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats 185 = new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>(); 186 updateServerStats(ServerName serverName, byte[] regionName, Object r)187 public void updateServerStats(ServerName serverName, byte[] regionName, 188 Object r) { 189 if (!(r instanceof Result)) { 190 return; 191 } 192 Result result = (Result) r; 193 ClientProtos.RegionLoadStats stats = result.getStats(); 194 if(stats == null){ 195 return; 196 } 197 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 198 ConcurrentMap<byte[], RegionStats> rsStats = null; 199 if (serverStats.containsKey(serverName)) { 200 rsStats = serverStats.get(serverName); 201 } else { 202 rsStats = serverStats.putIfAbsent(serverName, 203 new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR)); 204 if (rsStats == null) { 205 rsStats = serverStats.get(serverName); 206 } 207 } 208 RegionStats regionStats = null; 209 if (rsStats.containsKey(regionName)) { 210 regionStats = rsStats.get(regionName); 211 } else { 212 regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name)); 213 if (regionStats == null) { 214 regionStats = rsStats.get(regionName); 215 } 216 } 217 regionStats.update(stats); 218 } 219 220 221 /** A lambda for dispatching to the appropriate metric factory method */ 222 private static interface NewMetric<T> { newMetric(Class<?> clazz, String name, String scope)223 T newMetric(Class<?> clazz, String name, String scope); 224 } 225 226 /** Anticipated number of metric entries */ 227 private static final int CAPACITY = 50; 228 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 229 private static final float LOAD_FACTOR = 0.75f; 230 /** 231 * Anticipated number of concurrent accessor threads, from 232 * {@link ConnectionManager.HConnectionImplementation#getBatchPool()} 233 */ 234 private static final int CONCURRENCY_LEVEL = 256; 235 236 private final MetricsRegistry registry; 237 private final JmxReporter reporter; 238 private final String scope; 239 240 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 241 @Override public Timer newMetric(Class<?> clazz, String name, String scope) { 242 return registry.newTimer(clazz, name, scope); 243 } 244 }; 245 246 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 247 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) { 248 return registry.newHistogram(clazz, name, scope); 249 } 250 }; 251 252 // static metrics 253 254 @VisibleForTesting protected final Counter metaCacheHits; 255 @VisibleForTesting protected final Counter metaCacheMisses; 256 @VisibleForTesting protected final CallTracker getTracker; 257 @VisibleForTesting protected final CallTracker scanTracker; 258 @VisibleForTesting protected final CallTracker appendTracker; 259 @VisibleForTesting protected final CallTracker deleteTracker; 260 @VisibleForTesting protected final CallTracker incrementTracker; 261 @VisibleForTesting protected final CallTracker putTracker; 262 @VisibleForTesting protected final CallTracker multiTracker; 263 @VisibleForTesting protected final RunnerStats runnerStats; 264 265 // dynamic metrics 266 267 // These maps are used to cache references to the metric instances that are managed by the 268 // registry. I don't think their use perfectly removes redundant allocations, but it's 269 // a big improvement over calling registry.newMetric each time. 270 @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers = 271 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 272 @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms = 273 new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */, 274 LOAD_FACTOR, CONCURRENCY_LEVEL); 275 MetricsConnection(final ConnectionManager.HConnectionImplementation conn)276 public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) { 277 this.scope = conn.toString(); 278 this.registry = new MetricsRegistry(); 279 final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); 280 final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); 281 282 this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope, 283 new RatioGauge() { 284 @Override protected double getNumerator() { 285 return batchPool.getActiveCount(); 286 } 287 @Override protected double getDenominator() { 288 return batchPool.getMaximumPoolSize(); 289 } 290 }); 291 this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope, 292 new RatioGauge() { 293 @Override protected double getNumerator() { 294 return metaPool.getActiveCount(); 295 } 296 @Override protected double getDenominator() { 297 return metaPool.getMaximumPoolSize(); 298 } 299 }); 300 this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope); 301 this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope); 302 this.getTracker = new CallTracker(this.registry, "Get", scope); 303 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 304 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 305 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 306 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 307 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 308 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 309 this.runnerStats = new RunnerStats(this.registry); 310 311 this.reporter = new JmxReporter(this.registry); 312 this.reporter.start(); 313 } 314 shutdown()315 public void shutdown() { 316 this.reporter.shutdown(); 317 this.registry.shutdown(); 318 } 319 320 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ newCallStats()321 public static CallStats newCallStats() { 322 // TODO: instance pool to reduce GC? 323 return new CallStats(); 324 } 325 326 /** Increment the number of meta cache hits. */ incrMetaCacheHit()327 public void incrMetaCacheHit() { 328 metaCacheHits.inc(); 329 } 330 331 /** Increment the number of meta cache misses. */ incrMetaCacheMiss()332 public void incrMetaCacheMiss() { 333 metaCacheMisses.inc(); 334 } 335 336 /** Increment the number of normal runner counts. */ incrNormalRunners()337 public void incrNormalRunners() { 338 this.runnerStats.incrNormalRunners(); 339 } 340 341 /** Increment the number of delay runner counts. */ incrDelayRunners()342 public void incrDelayRunners() { 343 this.runnerStats.incrDelayRunners(); 344 } 345 346 /** Update delay interval of delay runner. */ updateDelayInterval(long interval)347 public void updateDelayInterval(long interval) { 348 this.runnerStats.updateDelayInterval(interval); 349 } 350 351 /** 352 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 353 */ getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory)354 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 355 T t = map.get(key); 356 if (t == null) { 357 t = factory.newMetric(this.getClass(), key, scope); 358 map.putIfAbsent(key, t); 359 } 360 return t; 361 } 362 363 /** Update call stats for non-critical-path methods */ updateRpcGeneric(MethodDescriptor method, CallStats stats)364 private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { 365 final String methodName = method.getService().getName() + "_" + method.getName(); 366 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) 367 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 368 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 369 .update(stats.getRequestSizeBytes()); 370 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 371 .update(stats.getResponseSizeBytes()); 372 } 373 374 /** Report RPC context to metrics system. */ updateRpc(MethodDescriptor method, Message param, CallStats stats)375 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { 376 // this implementation is tied directly to protobuf implementation details. would be better 377 // if we could dispatch based on something static, ie, request Message type. 378 if (method.getService() == ClientService.getDescriptor()) { 379 switch(method.getIndex()) { 380 case 0: 381 assert "Get".equals(method.getName()); 382 getTracker.updateRpc(stats); 383 return; 384 case 1: 385 assert "Mutate".equals(method.getName()); 386 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 387 switch(mutationType) { 388 case APPEND: 389 appendTracker.updateRpc(stats); 390 return; 391 case DELETE: 392 deleteTracker.updateRpc(stats); 393 return; 394 case INCREMENT: 395 incrementTracker.updateRpc(stats); 396 return; 397 case PUT: 398 putTracker.updateRpc(stats); 399 return; 400 default: 401 throw new RuntimeException("Unrecognized mutation type " + mutationType); 402 } 403 case 2: 404 assert "Scan".equals(method.getName()); 405 scanTracker.updateRpc(stats); 406 return; 407 case 3: 408 assert "BulkLoadHFile".equals(method.getName()); 409 // use generic implementation 410 break; 411 case 4: 412 assert "ExecService".equals(method.getName()); 413 // use generic implementation 414 break; 415 case 5: 416 assert "ExecRegionServerService".equals(method.getName()); 417 // use generic implementation 418 break; 419 case 6: 420 assert "Multi".equals(method.getName()); 421 multiTracker.updateRpc(stats); 422 return; 423 default: 424 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 425 } 426 } 427 // Fallback to dynamic registry lookup for DDL methods. 428 updateRpcGeneric(method, stats); 429 } 430 } 431