1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import java.io.File
21import java.net.Socket
22
23import scala.collection.mutable
24import scala.util.Properties
25
26import com.google.common.collect.MapMaker
27
28import org.apache.spark.annotation.DeveloperApi
29import org.apache.spark.api.python.PythonWorkerFactory
30import org.apache.spark.broadcast.BroadcastManager
31import org.apache.spark.internal.Logging
32import org.apache.spark.internal.config._
33import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
34import org.apache.spark.metrics.MetricsSystem
35import org.apache.spark.network.netty.NettyBlockTransferService
36import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
37import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
38import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
39import org.apache.spark.security.CryptoStreamUtils
40import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager}
41import org.apache.spark.shuffle.ShuffleManager
42import org.apache.spark.storage._
43import org.apache.spark.util.{RpcUtils, Utils}
44
45/**
46 * :: DeveloperApi ::
47 * Holds all the runtime environment objects for a running Spark instance (either master or worker),
48 * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
49 * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
50 * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
51 *
52 * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
53 *       in a future release.
54 */
55@DeveloperApi
56class SparkEnv (
57    val executorId: String,
58    private[spark] val rpcEnv: RpcEnv,
59    val serializer: Serializer,
60    val closureSerializer: Serializer,
61    val serializerManager: SerializerManager,
62    val mapOutputTracker: MapOutputTracker,
63    val shuffleManager: ShuffleManager,
64    val broadcastManager: BroadcastManager,
65    val blockManager: BlockManager,
66    val securityManager: SecurityManager,
67    val metricsSystem: MetricsSystem,
68    val memoryManager: MemoryManager,
69    val outputCommitCoordinator: OutputCommitCoordinator,
70    val conf: SparkConf) extends Logging {
71
72  private[spark] var isStopped = false
73  private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
74
75  // A general, soft-reference map for metadata needed during HadoopRDD split computation
76  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
77  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
78
79  private[spark] var driverTmpDir: Option[String] = None
80
81  private[spark] def stop() {
82
83    if (!isStopped) {
84      isStopped = true
85      pythonWorkers.values.foreach(_.stop())
86      mapOutputTracker.stop()
87      shuffleManager.stop()
88      broadcastManager.stop()
89      blockManager.stop()
90      blockManager.master.stop()
91      metricsSystem.stop()
92      outputCommitCoordinator.stop()
93      rpcEnv.shutdown()
94      rpcEnv.awaitTermination()
95
96      // If we only stop sc, but the driver process still run as a services then we need to delete
97      // the tmp dir, if not, it will create too many tmp dirs.
98      // We only need to delete the tmp dir create by driver
99      driverTmpDir match {
100        case Some(path) =>
101          try {
102            Utils.deleteRecursively(new File(path))
103          } catch {
104            case e: Exception =>
105              logWarning(s"Exception while deleting Spark temp dir: $path", e)
106          }
107        case None => // We just need to delete tmp dir created by driver, so do nothing on executor
108      }
109    }
110  }
111
112  private[spark]
113  def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
114    synchronized {
115      val key = (pythonExec, envVars)
116      pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
117    }
118  }
119
120  private[spark]
121  def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
122    synchronized {
123      val key = (pythonExec, envVars)
124      pythonWorkers.get(key).foreach(_.stopWorker(worker))
125    }
126  }
127
128  private[spark]
129  def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
130    synchronized {
131      val key = (pythonExec, envVars)
132      pythonWorkers.get(key).foreach(_.releaseWorker(worker))
133    }
134  }
135}
136
137object SparkEnv extends Logging {
138  @volatile private var env: SparkEnv = _
139
140  private[spark] val driverSystemName = "sparkDriver"
141  private[spark] val executorSystemName = "sparkExecutor"
142
143  def set(e: SparkEnv) {
144    env = e
145  }
146
147  /**
148   * Returns the SparkEnv.
149   */
150  def get: SparkEnv = {
151    env
152  }
153
154  /**
155   * Create a SparkEnv for the driver.
156   */
157  private[spark] def createDriverEnv(
158      conf: SparkConf,
159      isLocal: Boolean,
160      listenerBus: LiveListenerBus,
161      numCores: Int,
162      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
163    assert(conf.contains(DRIVER_HOST_ADDRESS),
164      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
165    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
166    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
167    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
168    val port = conf.get("spark.driver.port").toInt
169    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
170      Some(CryptoStreamUtils.createKey(conf))
171    } else {
172      None
173    }
174    create(
175      conf,
176      SparkContext.DRIVER_IDENTIFIER,
177      bindAddress,
178      advertiseAddress,
179      port,
180      isLocal,
181      numCores,
182      ioEncryptionKey,
183      listenerBus = listenerBus,
184      mockOutputCommitCoordinator = mockOutputCommitCoordinator
185    )
186  }
187
188  /**
189   * Create a SparkEnv for an executor.
190   * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
191   */
192  private[spark] def createExecutorEnv(
193      conf: SparkConf,
194      executorId: String,
195      hostname: String,
196      port: Int,
197      numCores: Int,
198      ioEncryptionKey: Option[Array[Byte]],
199      isLocal: Boolean): SparkEnv = {
200    val env = create(
201      conf,
202      executorId,
203      hostname,
204      hostname,
205      port,
206      isLocal,
207      numCores,
208      ioEncryptionKey
209    )
210    SparkEnv.set(env)
211    env
212  }
213
214  /**
215   * Helper method to create a SparkEnv for a driver or an executor.
216   */
217  private def create(
218      conf: SparkConf,
219      executorId: String,
220      bindAddress: String,
221      advertiseAddress: String,
222      port: Int,
223      isLocal: Boolean,
224      numUsableCores: Int,
225      ioEncryptionKey: Option[Array[Byte]],
226      listenerBus: LiveListenerBus = null,
227      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
228
229    val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
230
231    // Listener bus is only used on the driver
232    if (isDriver) {
233      assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
234    }
235
236    val securityManager = new SecurityManager(conf, ioEncryptionKey)
237    ioEncryptionKey.foreach { _ =>
238      if (!securityManager.isSaslEncryptionEnabled()) {
239        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
240          "wire.")
241      }
242    }
243
244    val systemName = if (isDriver) driverSystemName else executorSystemName
245    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
246      securityManager, clientMode = !isDriver)
247
248    // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
249    // In the non-driver case, the RPC env's address may be null since it may not be listening
250    // for incoming connections.
251    if (isDriver) {
252      conf.set("spark.driver.port", rpcEnv.address.port.toString)
253    } else if (rpcEnv.address != null) {
254      conf.set("spark.executor.port", rpcEnv.address.port.toString)
255      logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
256    }
257
258    // Create an instance of the class with the given name, possibly initializing it with our conf
259    def instantiateClass[T](className: String): T = {
260      val cls = Utils.classForName(className)
261      // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
262      // SparkConf, then one taking no arguments
263      try {
264        cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
265          .newInstance(conf, new java.lang.Boolean(isDriver))
266          .asInstanceOf[T]
267      } catch {
268        case _: NoSuchMethodException =>
269          try {
270            cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
271          } catch {
272            case _: NoSuchMethodException =>
273              cls.getConstructor().newInstance().asInstanceOf[T]
274          }
275      }
276    }
277
278    // Create an instance of the class named by the given SparkConf property, or defaultClassName
279    // if the property is not set, possibly initializing it with our conf
280    def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
281      instantiateClass[T](conf.get(propertyName, defaultClassName))
282    }
283
284    val serializer = instantiateClassFromConf[Serializer](
285      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
286    logDebug(s"Using serializer: ${serializer.getClass}")
287
288    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
289
290    val closureSerializer = new JavaSerializer(conf)
291
292    def registerOrLookupEndpoint(
293        name: String, endpointCreator: => RpcEndpoint):
294      RpcEndpointRef = {
295      if (isDriver) {
296        logInfo("Registering " + name)
297        rpcEnv.setupEndpoint(name, endpointCreator)
298      } else {
299        RpcUtils.makeDriverRef(name, conf, rpcEnv)
300      }
301    }
302
303    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
304
305    val mapOutputTracker = if (isDriver) {
306      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
307    } else {
308      new MapOutputTrackerWorker(conf)
309    }
310
311    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
312    // requires the MapOutputTracker itself
313    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
314      new MapOutputTrackerMasterEndpoint(
315        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
316
317    // Let the user specify short names for shuffle managers
318    val shortShuffleMgrNames = Map(
319      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
320      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
321    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
322    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
323    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
324
325    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
326    val memoryManager: MemoryManager =
327      if (useLegacyMemoryManager) {
328        new StaticMemoryManager(conf, numUsableCores)
329      } else {
330        UnifiedMemoryManager(conf, numUsableCores)
331      }
332
333    val blockManagerPort = if (isDriver) {
334      conf.get(DRIVER_BLOCK_MANAGER_PORT)
335    } else {
336      conf.get(BLOCK_MANAGER_PORT)
337    }
338
339    val blockTransferService =
340      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
341        blockManagerPort, numUsableCores)
342
343    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
344      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
345      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
346      conf, isDriver)
347
348    // NB: blockManager is not valid until initialize() is called later.
349    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
350      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
351      blockTransferService, securityManager, numUsableCores)
352
353    val metricsSystem = if (isDriver) {
354      // Don't start metrics system right now for Driver.
355      // We need to wait for the task scheduler to give us an app ID.
356      // Then we can start the metrics system.
357      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
358    } else {
359      // We need to set the executor ID before the MetricsSystem is created because sources and
360      // sinks specified in the metrics configuration file will want to incorporate this executor's
361      // ID into the metrics they report.
362      conf.set("spark.executor.id", executorId)
363      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
364      ms.start()
365      ms
366    }
367
368    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
369      new OutputCommitCoordinator(conf, isDriver)
370    }
371    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
372      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
373    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
374
375    val envInstance = new SparkEnv(
376      executorId,
377      rpcEnv,
378      serializer,
379      closureSerializer,
380      serializerManager,
381      mapOutputTracker,
382      shuffleManager,
383      broadcastManager,
384      blockManager,
385      securityManager,
386      metricsSystem,
387      memoryManager,
388      outputCommitCoordinator,
389      conf)
390
391    // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
392    // called, and we only need to do it for driver. Because driver may run as a service, and if we
393    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
394    if (isDriver) {
395      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
396      envInstance.driverTmpDir = Some(sparkFilesDir)
397    }
398
399    envInstance
400  }
401
402  /**
403   * Return a map representation of jvm information, Spark properties, system properties, and
404   * class paths. Map keys define the category, and map values represent the corresponding
405   * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
406   */
407  private[spark]
408  def environmentDetails(
409      conf: SparkConf,
410      schedulingMode: String,
411      addedJars: Seq[String],
412      addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
413
414    import Properties._
415    val jvmInformation = Seq(
416      ("Java Version", s"$javaVersion ($javaVendor)"),
417      ("Java Home", javaHome),
418      ("Scala Version", versionString)
419    ).sorted
420
421    // Spark properties
422    // This includes the scheduling mode whether or not it is configured (used by SparkUI)
423    val schedulerMode =
424      if (!conf.contains("spark.scheduler.mode")) {
425        Seq(("spark.scheduler.mode", schedulingMode))
426      } else {
427        Seq[(String, String)]()
428      }
429    val sparkProperties = (conf.getAll ++ schedulerMode).sorted
430
431    // System properties that are not java classpaths
432    val systemProperties = Utils.getSystemProperties.toSeq
433    val otherProperties = systemProperties.filter { case (k, _) =>
434      k != "java.class.path" && !k.startsWith("spark.")
435    }.sorted
436
437    // Class paths including all added jars and files
438    val classPathEntries = javaClassPath
439      .split(File.pathSeparator)
440      .filterNot(_.isEmpty)
441      .map((_, "System Classpath"))
442    val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
443    val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
444
445    Map[String, Seq[(String, String)]](
446      "JVM Information" -> jvmInformation,
447      "Spark Properties" -> sparkProperties,
448      "System Properties" -> otherProperties,
449      "Classpath Entries" -> classPaths)
450  }
451}
452