1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.executor 19 20import java.util.concurrent.ThreadPoolExecutor 21 22import scala.collection.JavaConverters._ 23 24import com.codahale.metrics.{Gauge, MetricRegistry} 25import org.apache.hadoop.fs.FileSystem 26 27import org.apache.spark.metrics.source.Source 28 29private[spark] 30class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source { 31 32 private def fileStats(scheme: String) : Option[FileSystem.Statistics] = 33 FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme)) 34 35 private def registerFileSystemStat[T]( 36 scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { 37 metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] { 38 override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue) 39 }) 40 } 41 42 override val metricRegistry = new MetricRegistry() 43 44 override val sourceName = "executor" 45 46 // Gauge for executor thread pool's actively executing task counts 47 metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { 48 override def getValue: Int = threadPool.getActiveCount() 49 }) 50 51 // Gauge for executor thread pool's approximate total number of tasks that have been completed 52 metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] { 53 override def getValue: Long = threadPool.getCompletedTaskCount() 54 }) 55 56 // Gauge for executor thread pool's current number of threads 57 metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] { 58 override def getValue: Int = threadPool.getPoolSize() 59 }) 60 61 // Gauge got executor thread pool's largest number of threads that have ever simultaneously 62 // been in th pool 63 metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] { 64 override def getValue: Int = threadPool.getMaximumPoolSize() 65 }) 66 67 // Gauge for file system stats of this executor 68 for (scheme <- Array("hdfs", "file")) { 69 registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L) 70 registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L) 71 registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) 72 registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0) 73 registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) 74 } 75} 76