1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.executor
19
20import java.util.concurrent.ThreadPoolExecutor
21
22import scala.collection.JavaConverters._
23
24import com.codahale.metrics.{Gauge, MetricRegistry}
25import org.apache.hadoop.fs.FileSystem
26
27import org.apache.spark.metrics.source.Source
28
29private[spark]
30class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
31
32  private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
33    FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme))
34
35  private def registerFileSystemStat[T](
36        scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
37    metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
38      override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
39    })
40  }
41
42  override val metricRegistry = new MetricRegistry()
43
44  override val sourceName = "executor"
45
46  // Gauge for executor thread pool's actively executing task counts
47  metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
48    override def getValue: Int = threadPool.getActiveCount()
49  })
50
51  // Gauge for executor thread pool's approximate total number of tasks that have been completed
52  metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
53    override def getValue: Long = threadPool.getCompletedTaskCount()
54  })
55
56  // Gauge for executor thread pool's current number of threads
57  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
58    override def getValue: Int = threadPool.getPoolSize()
59  })
60
61  // Gauge got executor thread pool's largest number of threads that have ever simultaneously
62  // been in th pool
63  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
64    override def getValue: Int = threadPool.getMaximumPoolSize()
65  })
66
67  // Gauge for file system stats of this executor
68  for (scheme <- Array("hdfs", "file")) {
69    registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
70    registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
71    registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
72    registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
73    registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
74  }
75}
76