1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import java.io.File 21import java.net.Socket 22 23import scala.collection.mutable 24import scala.util.Properties 25 26import com.google.common.collect.MapMaker 27 28import org.apache.spark.annotation.DeveloperApi 29import org.apache.spark.api.python.PythonWorkerFactory 30import org.apache.spark.broadcast.BroadcastManager 31import org.apache.spark.internal.Logging 32import org.apache.spark.internal.config._ 33import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} 34import org.apache.spark.metrics.MetricsSystem 35import org.apache.spark.network.netty.NettyBlockTransferService 36import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} 37import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} 38import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint 39import org.apache.spark.security.CryptoStreamUtils 40import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager} 41import org.apache.spark.shuffle.ShuffleManager 42import org.apache.spark.storage._ 43import org.apache.spark.util.{RpcUtils, Utils} 44 45/** 46 * :: DeveloperApi :: 47 * Holds all the runtime environment objects for a running Spark instance (either master or worker), 48 * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently 49 * Spark code finds the SparkEnv through a global variable, so all the threads can access the same 50 * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). 51 * 52 * NOTE: This is not intended for external use. This is exposed for Shark and may be made private 53 * in a future release. 54 */ 55@DeveloperApi 56class SparkEnv ( 57 val executorId: String, 58 private[spark] val rpcEnv: RpcEnv, 59 val serializer: Serializer, 60 val closureSerializer: Serializer, 61 val serializerManager: SerializerManager, 62 val mapOutputTracker: MapOutputTracker, 63 val shuffleManager: ShuffleManager, 64 val broadcastManager: BroadcastManager, 65 val blockManager: BlockManager, 66 val securityManager: SecurityManager, 67 val metricsSystem: MetricsSystem, 68 val memoryManager: MemoryManager, 69 val outputCommitCoordinator: OutputCommitCoordinator, 70 val conf: SparkConf) extends Logging { 71 72 private[spark] var isStopped = false 73 private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() 74 75 // A general, soft-reference map for metadata needed during HadoopRDD split computation 76 // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). 77 private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() 78 79 private[spark] var driverTmpDir: Option[String] = None 80 81 private[spark] def stop() { 82 83 if (!isStopped) { 84 isStopped = true 85 pythonWorkers.values.foreach(_.stop()) 86 mapOutputTracker.stop() 87 shuffleManager.stop() 88 broadcastManager.stop() 89 blockManager.stop() 90 blockManager.master.stop() 91 metricsSystem.stop() 92 outputCommitCoordinator.stop() 93 rpcEnv.shutdown() 94 rpcEnv.awaitTermination() 95 96 // If we only stop sc, but the driver process still run as a services then we need to delete 97 // the tmp dir, if not, it will create too many tmp dirs. 98 // We only need to delete the tmp dir create by driver 99 driverTmpDir match { 100 case Some(path) => 101 try { 102 Utils.deleteRecursively(new File(path)) 103 } catch { 104 case e: Exception => 105 logWarning(s"Exception while deleting Spark temp dir: $path", e) 106 } 107 case None => // We just need to delete tmp dir created by driver, so do nothing on executor 108 } 109 } 110 } 111 112 private[spark] 113 def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { 114 synchronized { 115 val key = (pythonExec, envVars) 116 pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() 117 } 118 } 119 120 private[spark] 121 def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) { 122 synchronized { 123 val key = (pythonExec, envVars) 124 pythonWorkers.get(key).foreach(_.stopWorker(worker)) 125 } 126 } 127 128 private[spark] 129 def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) { 130 synchronized { 131 val key = (pythonExec, envVars) 132 pythonWorkers.get(key).foreach(_.releaseWorker(worker)) 133 } 134 } 135} 136 137object SparkEnv extends Logging { 138 @volatile private var env: SparkEnv = _ 139 140 private[spark] val driverSystemName = "sparkDriver" 141 private[spark] val executorSystemName = "sparkExecutor" 142 143 def set(e: SparkEnv) { 144 env = e 145 } 146 147 /** 148 * Returns the SparkEnv. 149 */ 150 def get: SparkEnv = { 151 env 152 } 153 154 /** 155 * Create a SparkEnv for the driver. 156 */ 157 private[spark] def createDriverEnv( 158 conf: SparkConf, 159 isLocal: Boolean, 160 listenerBus: LiveListenerBus, 161 numCores: Int, 162 mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { 163 assert(conf.contains(DRIVER_HOST_ADDRESS), 164 s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") 165 assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") 166 val bindAddress = conf.get(DRIVER_BIND_ADDRESS) 167 val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS) 168 val port = conf.get("spark.driver.port").toInt 169 val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) { 170 Some(CryptoStreamUtils.createKey(conf)) 171 } else { 172 None 173 } 174 create( 175 conf, 176 SparkContext.DRIVER_IDENTIFIER, 177 bindAddress, 178 advertiseAddress, 179 port, 180 isLocal, 181 numCores, 182 ioEncryptionKey, 183 listenerBus = listenerBus, 184 mockOutputCommitCoordinator = mockOutputCommitCoordinator 185 ) 186 } 187 188 /** 189 * Create a SparkEnv for an executor. 190 * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated. 191 */ 192 private[spark] def createExecutorEnv( 193 conf: SparkConf, 194 executorId: String, 195 hostname: String, 196 port: Int, 197 numCores: Int, 198 ioEncryptionKey: Option[Array[Byte]], 199 isLocal: Boolean): SparkEnv = { 200 val env = create( 201 conf, 202 executorId, 203 hostname, 204 hostname, 205 port, 206 isLocal, 207 numCores, 208 ioEncryptionKey 209 ) 210 SparkEnv.set(env) 211 env 212 } 213 214 /** 215 * Helper method to create a SparkEnv for a driver or an executor. 216 */ 217 private def create( 218 conf: SparkConf, 219 executorId: String, 220 bindAddress: String, 221 advertiseAddress: String, 222 port: Int, 223 isLocal: Boolean, 224 numUsableCores: Int, 225 ioEncryptionKey: Option[Array[Byte]], 226 listenerBus: LiveListenerBus = null, 227 mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { 228 229 val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER 230 231 // Listener bus is only used on the driver 232 if (isDriver) { 233 assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!") 234 } 235 236 val securityManager = new SecurityManager(conf, ioEncryptionKey) 237 ioEncryptionKey.foreach { _ => 238 if (!securityManager.isSaslEncryptionEnabled()) { 239 logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " + 240 "wire.") 241 } 242 } 243 244 val systemName = if (isDriver) driverSystemName else executorSystemName 245 val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, 246 securityManager, clientMode = !isDriver) 247 248 // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. 249 // In the non-driver case, the RPC env's address may be null since it may not be listening 250 // for incoming connections. 251 if (isDriver) { 252 conf.set("spark.driver.port", rpcEnv.address.port.toString) 253 } else if (rpcEnv.address != null) { 254 conf.set("spark.executor.port", rpcEnv.address.port.toString) 255 logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}") 256 } 257 258 // Create an instance of the class with the given name, possibly initializing it with our conf 259 def instantiateClass[T](className: String): T = { 260 val cls = Utils.classForName(className) 261 // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just 262 // SparkConf, then one taking no arguments 263 try { 264 cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) 265 .newInstance(conf, new java.lang.Boolean(isDriver)) 266 .asInstanceOf[T] 267 } catch { 268 case _: NoSuchMethodException => 269 try { 270 cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] 271 } catch { 272 case _: NoSuchMethodException => 273 cls.getConstructor().newInstance().asInstanceOf[T] 274 } 275 } 276 } 277 278 // Create an instance of the class named by the given SparkConf property, or defaultClassName 279 // if the property is not set, possibly initializing it with our conf 280 def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { 281 instantiateClass[T](conf.get(propertyName, defaultClassName)) 282 } 283 284 val serializer = instantiateClassFromConf[Serializer]( 285 "spark.serializer", "org.apache.spark.serializer.JavaSerializer") 286 logDebug(s"Using serializer: ${serializer.getClass}") 287 288 val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) 289 290 val closureSerializer = new JavaSerializer(conf) 291 292 def registerOrLookupEndpoint( 293 name: String, endpointCreator: => RpcEndpoint): 294 RpcEndpointRef = { 295 if (isDriver) { 296 logInfo("Registering " + name) 297 rpcEnv.setupEndpoint(name, endpointCreator) 298 } else { 299 RpcUtils.makeDriverRef(name, conf, rpcEnv) 300 } 301 } 302 303 val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) 304 305 val mapOutputTracker = if (isDriver) { 306 new MapOutputTrackerMaster(conf, broadcastManager, isLocal) 307 } else { 308 new MapOutputTrackerWorker(conf) 309 } 310 311 // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint 312 // requires the MapOutputTracker itself 313 mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, 314 new MapOutputTrackerMasterEndpoint( 315 rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) 316 317 // Let the user specify short names for shuffle managers 318 val shortShuffleMgrNames = Map( 319 "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 320 "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 321 val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") 322 val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 323 val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 324 325 val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) 326 val memoryManager: MemoryManager = 327 if (useLegacyMemoryManager) { 328 new StaticMemoryManager(conf, numUsableCores) 329 } else { 330 UnifiedMemoryManager(conf, numUsableCores) 331 } 332 333 val blockManagerPort = if (isDriver) { 334 conf.get(DRIVER_BLOCK_MANAGER_PORT) 335 } else { 336 conf.get(BLOCK_MANAGER_PORT) 337 } 338 339 val blockTransferService = 340 new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, 341 blockManagerPort, numUsableCores) 342 343 val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( 344 BlockManagerMaster.DRIVER_ENDPOINT_NAME, 345 new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), 346 conf, isDriver) 347 348 // NB: blockManager is not valid until initialize() is called later. 349 val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, 350 serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, 351 blockTransferService, securityManager, numUsableCores) 352 353 val metricsSystem = if (isDriver) { 354 // Don't start metrics system right now for Driver. 355 // We need to wait for the task scheduler to give us an app ID. 356 // Then we can start the metrics system. 357 MetricsSystem.createMetricsSystem("driver", conf, securityManager) 358 } else { 359 // We need to set the executor ID before the MetricsSystem is created because sources and 360 // sinks specified in the metrics configuration file will want to incorporate this executor's 361 // ID into the metrics they report. 362 conf.set("spark.executor.id", executorId) 363 val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) 364 ms.start() 365 ms 366 } 367 368 val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { 369 new OutputCommitCoordinator(conf, isDriver) 370 } 371 val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", 372 new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) 373 outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) 374 375 val envInstance = new SparkEnv( 376 executorId, 377 rpcEnv, 378 serializer, 379 closureSerializer, 380 serializerManager, 381 mapOutputTracker, 382 shuffleManager, 383 broadcastManager, 384 blockManager, 385 securityManager, 386 metricsSystem, 387 memoryManager, 388 outputCommitCoordinator, 389 conf) 390 391 // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is 392 // called, and we only need to do it for driver. Because driver may run as a service, and if we 393 // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. 394 if (isDriver) { 395 val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath 396 envInstance.driverTmpDir = Some(sparkFilesDir) 397 } 398 399 envInstance 400 } 401 402 /** 403 * Return a map representation of jvm information, Spark properties, system properties, and 404 * class paths. Map keys define the category, and map values represent the corresponding 405 * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate. 406 */ 407 private[spark] 408 def environmentDetails( 409 conf: SparkConf, 410 schedulingMode: String, 411 addedJars: Seq[String], 412 addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { 413 414 import Properties._ 415 val jvmInformation = Seq( 416 ("Java Version", s"$javaVersion ($javaVendor)"), 417 ("Java Home", javaHome), 418 ("Scala Version", versionString) 419 ).sorted 420 421 // Spark properties 422 // This includes the scheduling mode whether or not it is configured (used by SparkUI) 423 val schedulerMode = 424 if (!conf.contains("spark.scheduler.mode")) { 425 Seq(("spark.scheduler.mode", schedulingMode)) 426 } else { 427 Seq[(String, String)]() 428 } 429 val sparkProperties = (conf.getAll ++ schedulerMode).sorted 430 431 // System properties that are not java classpaths 432 val systemProperties = Utils.getSystemProperties.toSeq 433 val otherProperties = systemProperties.filter { case (k, _) => 434 k != "java.class.path" && !k.startsWith("spark.") 435 }.sorted 436 437 // Class paths including all added jars and files 438 val classPathEntries = javaClassPath 439 .split(File.pathSeparator) 440 .filterNot(_.isEmpty) 441 .map((_, "System Classpath")) 442 val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) 443 val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted 444 445 Map[String, Seq[(String, String)]]( 446 "JVM Information" -> jvmInformation, 447 "Spark Properties" -> sparkProperties, 448 "System Properties" -> otherProperties, 449 "Classpath Entries" -> classPaths) 450 } 451} 452