1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.storage 19 20import java.io._ 21import java.nio.ByteBuffer 22 23import scala.collection.mutable 24import scala.collection.mutable.HashMap 25import scala.concurrent.{Await, ExecutionContext, Future} 26import scala.concurrent.duration._ 27import scala.reflect.ClassTag 28import scala.util.Random 29import scala.util.control.NonFatal 30 31import com.google.common.io.ByteStreams 32 33import org.apache.spark._ 34import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} 35import org.apache.spark.internal.Logging 36import org.apache.spark.memory.{MemoryManager, MemoryMode} 37import org.apache.spark.network._ 38import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} 39import org.apache.spark.network.netty.SparkTransportConf 40import org.apache.spark.network.shuffle.ExternalShuffleClient 41import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo 42import org.apache.spark.rpc.RpcEnv 43import org.apache.spark.security.CryptoStreamUtils 44import org.apache.spark.serializer.{SerializerInstance, SerializerManager} 45import org.apache.spark.shuffle.ShuffleManager 46import org.apache.spark.storage.memory._ 47import org.apache.spark.unsafe.Platform 48import org.apache.spark.util._ 49import org.apache.spark.util.io.ChunkedByteBuffer 50 51 52/* Class for returning a fetched block and associated metrics. */ 53private[spark] class BlockResult( 54 val data: Iterator[Any], 55 val readMethod: DataReadMethod.Value, 56 val bytes: Long) 57 58/** 59 * Manager running on every node (driver and executors) which provides interfaces for putting and 60 * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). 61 * 62 * Note that [[initialize()]] must be called before the BlockManager is usable. 63 */ 64private[spark] class BlockManager( 65 executorId: String, 66 rpcEnv: RpcEnv, 67 val master: BlockManagerMaster, 68 val serializerManager: SerializerManager, 69 val conf: SparkConf, 70 memoryManager: MemoryManager, 71 mapOutputTracker: MapOutputTracker, 72 shuffleManager: ShuffleManager, 73 val blockTransferService: BlockTransferService, 74 securityManager: SecurityManager, 75 numUsableCores: Int) 76 extends BlockDataManager with BlockEvictionHandler with Logging { 77 78 private[spark] val externalShuffleServiceEnabled = 79 conf.getBoolean("spark.shuffle.service.enabled", false) 80 81 val diskBlockManager = { 82 // Only perform cleanup if an external service is not serving our shuffle files. 83 val deleteFilesOnStop = 84 !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER 85 new DiskBlockManager(conf, deleteFilesOnStop) 86 } 87 88 // Visible for testing 89 private[storage] val blockInfoManager = new BlockInfoManager 90 91 private val futureExecutionContext = ExecutionContext.fromExecutorService( 92 ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) 93 94 // Actual storage of where blocks are kept 95 private[spark] val memoryStore = 96 new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) 97 private[spark] val diskStore = new DiskStore(conf, diskBlockManager) 98 memoryManager.setMemoryStore(memoryStore) 99 100 // Note: depending on the memory manager, `maxMemory` may actually vary over time. 101 // However, since we use this only for reporting and logging, what we actually want here is 102 // the absolute maximum value that `maxMemory` can ever possibly reach. We may need 103 // to revisit whether reporting this value as the "max" is intuitive to the user. 104 private val maxMemory = 105 memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory 106 107 // Port used by the external shuffle service. In Yarn mode, this may be already be 108 // set through the Hadoop configuration as the server is launched in the Yarn NM. 109 private val externalShuffleServicePort = { 110 val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt 111 if (tmpPort == 0) { 112 // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds 113 // an open port. But we still need to tell our spark apps the right port to use. So 114 // only if the yarn config has the port set to 0, we prefer the value in the spark config 115 conf.get("spark.shuffle.service.port").toInt 116 } else { 117 tmpPort 118 } 119 } 120 121 var blockManagerId: BlockManagerId = _ 122 123 // Address of the server that serves this executor's shuffle files. This is either an external 124 // service, or just our own Executor's BlockManager. 125 private[spark] var shuffleServerId: BlockManagerId = _ 126 127 // Client to read other executors' shuffle files. This is either an external service, or just the 128 // standard BlockTransferService to directly connect to other Executors. 129 private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { 130 val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) 131 new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), 132 securityManager.isSaslEncryptionEnabled()) 133 } else { 134 blockTransferService 135 } 136 137 // Max number of failures before this block manager refreshes the block locations from the driver 138 private val maxFailuresBeforeLocationRefresh = 139 conf.getInt("spark.block.failures.beforeLocationRefresh", 5) 140 141 private val slaveEndpoint = rpcEnv.setupEndpoint( 142 "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, 143 new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker)) 144 145 // Pending re-registration action being executed asynchronously or null if none is pending. 146 // Accesses should synchronize on asyncReregisterLock. 147 private var asyncReregisterTask: Future[Unit] = null 148 private val asyncReregisterLock = new Object 149 150 // Field related to peer block managers that are necessary for block replication 151 @volatile private var cachedPeers: Seq[BlockManagerId] = _ 152 private val peerFetchLock = new Object 153 private var lastPeerFetchTime = 0L 154 155 private var blockReplicationPolicy: BlockReplicationPolicy = _ 156 157 /** 158 * Initializes the BlockManager with the given appId. This is not performed in the constructor as 159 * the appId may not be known at BlockManager instantiation time (in particular for the driver, 160 * where it is only learned after registration with the TaskScheduler). 161 * 162 * This method initializes the BlockTransferService and ShuffleClient, registers with the 163 * BlockManagerMaster, starts the BlockManagerWorker endpoint, and registers with a local shuffle 164 * service if configured. 165 */ 166 def initialize(appId: String): Unit = { 167 blockTransferService.init(this) 168 shuffleClient.init(appId) 169 170 blockReplicationPolicy = { 171 val priorityClass = conf.get( 172 "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) 173 val clazz = Utils.classForName(priorityClass) 174 val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] 175 logInfo(s"Using $priorityClass for block replication policy") 176 ret 177 } 178 179 val id = 180 BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) 181 182 val idFromMaster = master.registerBlockManager( 183 id, 184 maxMemory, 185 slaveEndpoint) 186 187 blockManagerId = if (idFromMaster != null) idFromMaster else id 188 189 shuffleServerId = if (externalShuffleServiceEnabled) { 190 logInfo(s"external shuffle service port = $externalShuffleServicePort") 191 BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) 192 } else { 193 blockManagerId 194 } 195 196 // Register Executors' configuration with the local shuffle service, if one should exist. 197 if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { 198 registerWithExternalShuffleServer() 199 } 200 201 logInfo(s"Initialized BlockManager: $blockManagerId") 202 } 203 204 private def registerWithExternalShuffleServer() { 205 logInfo("Registering executor with local external shuffle service.") 206 val shuffleConfig = new ExecutorShuffleInfo( 207 diskBlockManager.localDirs.map(_.toString), 208 diskBlockManager.subDirsPerLocalDir, 209 shuffleManager.getClass.getName) 210 211 val MAX_ATTEMPTS = 3 212 val SLEEP_TIME_SECS = 5 213 214 for (i <- 1 to MAX_ATTEMPTS) { 215 try { 216 // Synchronous and will throw an exception if we cannot connect. 217 shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer( 218 shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig) 219 return 220 } catch { 221 case e: Exception if i < MAX_ATTEMPTS => 222 logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" 223 + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) 224 Thread.sleep(SLEEP_TIME_SECS * 1000) 225 case NonFatal(e) => 226 throw new SparkException("Unable to register with external shuffle server due to : " + 227 e.getMessage, e) 228 } 229 } 230 } 231 232 /** 233 * Report all blocks to the BlockManager again. This may be necessary if we are dropped 234 * by the BlockManager and come back or if we become capable of recovering blocks on disk after 235 * an executor crash. 236 * 237 * This function deliberately fails silently if the master returns false (indicating that 238 * the slave needs to re-register). The error condition will be detected again by the next 239 * heart beat attempt or new block registration and another try to re-register all blocks 240 * will be made then. 241 */ 242 private def reportAllBlocks(): Unit = { 243 logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") 244 for ((blockId, info) <- blockInfoManager.entries) { 245 val status = getCurrentBlockStatus(blockId, info) 246 if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) { 247 logError(s"Failed to report $blockId to master; giving up.") 248 return 249 } 250 } 251 } 252 253 /** 254 * Re-register with the master and report all blocks to it. This will be called by the heart beat 255 * thread if our heartbeat to the block manager indicates that we were not registered. 256 * 257 * Note that this method must be called without any BlockInfo locks held. 258 */ 259 def reregister(): Unit = { 260 // TODO: We might need to rate limit re-registering. 261 logInfo(s"BlockManager $blockManagerId re-registering with master") 262 master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) 263 reportAllBlocks() 264 } 265 266 /** 267 * Re-register with the master sometime soon. 268 */ 269 private def asyncReregister(): Unit = { 270 asyncReregisterLock.synchronized { 271 if (asyncReregisterTask == null) { 272 asyncReregisterTask = Future[Unit] { 273 // This is a blocking action and should run in futureExecutionContext which is a cached 274 // thread pool 275 reregister() 276 asyncReregisterLock.synchronized { 277 asyncReregisterTask = null 278 } 279 }(futureExecutionContext) 280 } 281 } 282 } 283 284 /** 285 * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing. 286 */ 287 def waitForAsyncReregister(): Unit = { 288 val task = asyncReregisterTask 289 if (task != null) { 290 try { 291 Await.ready(task, Duration.Inf) 292 } catch { 293 case NonFatal(t) => 294 throw new Exception("Error occurred while waiting for async. reregistration", t) 295 } 296 } 297 } 298 299 /** 300 * Interface to get local block data. Throws an exception if the block cannot be found or 301 * cannot be read successfully. 302 */ 303 override def getBlockData(blockId: BlockId): ManagedBuffer = { 304 if (blockId.isShuffle) { 305 shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) 306 } else { 307 getLocalBytes(blockId) match { 308 case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer) 309 case None => 310 // If this block manager receives a request for a block that it doesn't have then it's 311 // likely that the master has outdated block statuses for this block. Therefore, we send 312 // an RPC so that this block is marked as being unavailable from this block manager. 313 reportBlockStatus(blockId, BlockStatus.empty) 314 throw new BlockNotFoundException(blockId.toString) 315 } 316 } 317 } 318 319 /** 320 * Put the block locally, using the given storage level. 321 * 322 * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing 323 * so may corrupt or change the data stored by the `BlockManager`. 324 */ 325 override def putBlockData( 326 blockId: BlockId, 327 data: ManagedBuffer, 328 level: StorageLevel, 329 classTag: ClassTag[_]): Boolean = { 330 putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) 331 } 332 333 /** 334 * Get the BlockStatus for the block identified by the given ID, if it exists. 335 * NOTE: This is mainly for testing. 336 */ 337 def getStatus(blockId: BlockId): Option[BlockStatus] = { 338 blockInfoManager.get(blockId).map { info => 339 val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L 340 val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L 341 BlockStatus(info.level, memSize = memSize, diskSize = diskSize) 342 } 343 } 344 345 /** 346 * Get the ids of existing blocks that match the given filter. Note that this will 347 * query the blocks stored in the disk block manager (that the block manager 348 * may not know of). 349 */ 350 def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = { 351 // The `toArray` is necessary here in order to force the list to be materialized so that we 352 // don't try to serialize a lazy iterator when responding to client requests. 353 (blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks()) 354 .filter(filter) 355 .toArray 356 .toSeq 357 } 358 359 /** 360 * Tell the master about the current storage status of a block. This will send a block update 361 * message reflecting the current status, *not* the desired storage level in its block info. 362 * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. 363 * 364 * droppedMemorySize exists to account for when the block is dropped from memory to disk (so 365 * it is still valid). This ensures that update in master will compensate for the increase in 366 * memory on slave. 367 */ 368 private def reportBlockStatus( 369 blockId: BlockId, 370 status: BlockStatus, 371 droppedMemorySize: Long = 0L): Unit = { 372 val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize) 373 if (needReregister) { 374 logInfo(s"Got told to re-register updating block $blockId") 375 // Re-registering will report our new block for free. 376 asyncReregister() 377 } 378 logDebug(s"Told master about block $blockId") 379 } 380 381 /** 382 * Actually send a UpdateBlockInfo message. Returns the master's response, 383 * which will be true if the block was successfully recorded and false if 384 * the slave needs to re-register. 385 */ 386 private def tryToReportBlockStatus( 387 blockId: BlockId, 388 status: BlockStatus, 389 droppedMemorySize: Long = 0L): Boolean = { 390 val storageLevel = status.storageLevel 391 val inMemSize = Math.max(status.memSize, droppedMemorySize) 392 val onDiskSize = status.diskSize 393 master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) 394 } 395 396 /** 397 * Return the updated storage status of the block with the given ID. More specifically, if 398 * the block is dropped from memory and possibly added to disk, return the new storage level 399 * and the updated in-memory and on-disk sizes. 400 */ 401 private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { 402 info.synchronized { 403 info.level match { 404 case null => 405 BlockStatus.empty 406 case level => 407 val inMem = level.useMemory && memoryStore.contains(blockId) 408 val onDisk = level.useDisk && diskStore.contains(blockId) 409 val deserialized = if (inMem) level.deserialized else false 410 val replication = if (inMem || onDisk) level.replication else 1 411 val storageLevel = StorageLevel( 412 useDisk = onDisk, 413 useMemory = inMem, 414 useOffHeap = level.useOffHeap, 415 deserialized = deserialized, 416 replication = replication) 417 val memSize = if (inMem) memoryStore.getSize(blockId) else 0L 418 val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L 419 BlockStatus(storageLevel, memSize, diskSize) 420 } 421 } 422 } 423 424 /** 425 * Get locations of an array of blocks. 426 */ 427 private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = { 428 val startTimeMs = System.currentTimeMillis 429 val locations = master.getLocations(blockIds).toArray 430 logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs))) 431 locations 432 } 433 434 /** 435 * Cleanup code run in response to a failed local read. 436 * Must be called while holding a read lock on the block. 437 */ 438 private def handleLocalReadFailure(blockId: BlockId): Nothing = { 439 releaseLock(blockId) 440 // Remove the missing block so that its unavailability is reported to the driver 441 removeBlock(blockId) 442 throw new SparkException(s"Block $blockId was not found even though it's read-locked") 443 } 444 445 /** 446 * Get block from local block manager as an iterator of Java objects. 447 */ 448 def getLocalValues(blockId: BlockId): Option[BlockResult] = { 449 logDebug(s"Getting local block $blockId") 450 blockInfoManager.lockForReading(blockId) match { 451 case None => 452 logDebug(s"Block $blockId was not found") 453 None 454 case Some(info) => 455 val level = info.level 456 logDebug(s"Level for block $blockId is $level") 457 if (level.useMemory && memoryStore.contains(blockId)) { 458 val iter: Iterator[Any] = if (level.deserialized) { 459 memoryStore.getValues(blockId).get 460 } else { 461 serializerManager.dataDeserializeStream( 462 blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) 463 } 464 val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) 465 Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) 466 } else if (level.useDisk && diskStore.contains(blockId)) { 467 val iterToReturn: Iterator[Any] = { 468 val diskBytes = diskStore.getBytes(blockId) 469 if (level.deserialized) { 470 val diskValues = serializerManager.dataDeserializeStream( 471 blockId, 472 diskBytes.toInputStream(dispose = true))(info.classTag) 473 maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) 474 } else { 475 val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) 476 .map {_.toInputStream(dispose = false)} 477 .getOrElse { diskBytes.toInputStream(dispose = true) } 478 serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) 479 } 480 } 481 val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) 482 Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) 483 } else { 484 handleLocalReadFailure(blockId) 485 } 486 } 487 } 488 489 /** 490 * Get block from the local block manager as serialized bytes. 491 */ 492 def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { 493 logDebug(s"Getting local block $blockId as bytes") 494 // As an optimization for map output fetches, if the block is for a shuffle, return it 495 // without acquiring a lock; the disk store never deletes (recent) items so this should work 496 if (blockId.isShuffle) { 497 val shuffleBlockResolver = shuffleManager.shuffleBlockResolver 498 // TODO: This should gracefully handle case where local block is not available. Currently 499 // downstream code will throw an exception. 500 Option( 501 new ChunkedByteBuffer( 502 shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())) 503 } else { 504 blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } 505 } 506 } 507 508 /** 509 * Get block from the local block manager as serialized bytes. 510 * 511 * Must be called while holding a read lock on the block. 512 * Releases the read lock upon exception; keeps the read lock upon successful return. 513 */ 514 private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = { 515 val level = info.level 516 logDebug(s"Level for block $blockId is $level") 517 // In order, try to read the serialized bytes from memory, then from disk, then fall back to 518 // serializing in-memory objects, and, finally, throw an exception if the block does not exist. 519 if (level.deserialized) { 520 // Try to avoid expensive serialization by reading a pre-serialized copy from disk: 521 if (level.useDisk && diskStore.contains(blockId)) { 522 // Note: we purposely do not try to put the block back into memory here. Since this branch 523 // handles deserialized blocks, this block may only be cached in memory as objects, not 524 // serialized bytes. Because the caller only requested bytes, it doesn't make sense to 525 // cache the block's deserialized objects since that caching may not have a payoff. 526 diskStore.getBytes(blockId) 527 } else if (level.useMemory && memoryStore.contains(blockId)) { 528 // The block was not found on disk, so serialize an in-memory copy: 529 serializerManager.dataSerializeWithExplicitClassTag( 530 blockId, memoryStore.getValues(blockId).get, info.classTag) 531 } else { 532 handleLocalReadFailure(blockId) 533 } 534 } else { // storage level is serialized 535 if (level.useMemory && memoryStore.contains(blockId)) { 536 memoryStore.getBytes(blockId).get 537 } else if (level.useDisk && diskStore.contains(blockId)) { 538 val diskBytes = diskStore.getBytes(blockId) 539 maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes) 540 } else { 541 handleLocalReadFailure(blockId) 542 } 543 } 544 } 545 546 /** 547 * Get block from remote block managers. 548 * 549 * This does not acquire a lock on this block in this JVM. 550 */ 551 private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { 552 val ct = implicitly[ClassTag[T]] 553 getRemoteBytes(blockId).map { data => 554 val values = 555 serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) 556 new BlockResult(values, DataReadMethod.Network, data.size) 557 } 558 } 559 560 /** 561 * Return a list of locations for the given block, prioritizing the local machine since 562 * multiple block managers can share the same host. 563 */ 564 private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 565 val locs = Random.shuffle(master.getLocations(blockId)) 566 val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } 567 preferredLocs ++ otherLocs 568 } 569 570 /** 571 * Get block from remote block managers as serialized bytes. 572 */ 573 def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { 574 logDebug(s"Getting remote block $blockId") 575 require(blockId != null, "BlockId is null") 576 var runningFailureCount = 0 577 var totalFailureCount = 0 578 val locations = getLocations(blockId) 579 val maxFetchFailures = locations.size 580 var locationIterator = locations.iterator 581 while (locationIterator.hasNext) { 582 val loc = locationIterator.next() 583 logDebug(s"Getting remote block $blockId from $loc") 584 val data = try { 585 blockTransferService.fetchBlockSync( 586 loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 587 } catch { 588 case NonFatal(e) => 589 runningFailureCount += 1 590 totalFailureCount += 1 591 592 if (totalFailureCount >= maxFetchFailures) { 593 // Give up trying anymore locations. Either we've tried all of the original locations, 594 // or we've refreshed the list of locations from the master, and have still 595 // hit failures after trying locations from the refreshed list. 596 logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " + 597 s"Most recent failure cause:", e) 598 return None 599 } 600 601 logWarning(s"Failed to fetch remote block $blockId " + 602 s"from $loc (failed attempt $runningFailureCount)", e) 603 604 // If there is a large number of executors then locations list can contain a 605 // large number of stale entries causing a large number of retries that may 606 // take a significant amount of time. To get rid of these stale entries 607 // we refresh the block locations after a certain number of fetch failures 608 if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { 609 locationIterator = getLocations(blockId).iterator 610 logDebug(s"Refreshed locations from the driver " + 611 s"after ${runningFailureCount} fetch failures.") 612 runningFailureCount = 0 613 } 614 615 // This location failed, so we retry fetch from a different one by returning null here 616 null 617 } 618 619 if (data != null) { 620 return Some(new ChunkedByteBuffer(data)) 621 } 622 logDebug(s"The value of block $blockId is null") 623 } 624 logDebug(s"Block $blockId not found") 625 None 626 } 627 628 /** 629 * Get a block from the block manager (either local or remote). 630 * 631 * This acquires a read lock on the block if the block was stored locally and does not acquire 632 * any locks if the block was fetched from a remote block manager. The read lock will 633 * automatically be freed once the result's `data` iterator is fully consumed. 634 */ 635 def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { 636 val local = getLocalValues(blockId) 637 if (local.isDefined) { 638 logInfo(s"Found block $blockId locally") 639 return local 640 } 641 val remote = getRemoteValues[T](blockId) 642 if (remote.isDefined) { 643 logInfo(s"Found block $blockId remotely") 644 return remote 645 } 646 None 647 } 648 649 /** 650 * Downgrades an exclusive write lock to a shared read lock. 651 */ 652 def downgradeLock(blockId: BlockId): Unit = { 653 blockInfoManager.downgradeLock(blockId) 654 } 655 656 /** 657 * Release a lock on the given block. 658 */ 659 def releaseLock(blockId: BlockId): Unit = { 660 blockInfoManager.unlock(blockId) 661 } 662 663 /** 664 * Registers a task with the BlockManager in order to initialize per-task bookkeeping structures. 665 */ 666 def registerTask(taskAttemptId: Long): Unit = { 667 blockInfoManager.registerTask(taskAttemptId) 668 } 669 670 /** 671 * Release all locks for the given task. 672 * 673 * @return the blocks whose locks were released. 674 */ 675 def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = { 676 blockInfoManager.releaseAllLocksForTask(taskAttemptId) 677 } 678 679 /** 680 * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method 681 * to compute the block, persist it, and return its values. 682 * 683 * @return either a BlockResult if the block was successfully cached, or an iterator if the block 684 * could not be cached. 685 */ 686 def getOrElseUpdate[T]( 687 blockId: BlockId, 688 level: StorageLevel, 689 classTag: ClassTag[T], 690 makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { 691 // Attempt to read the block from local or remote storage. If it's present, then we don't need 692 // to go through the local-get-or-put path. 693 get[T](blockId)(classTag) match { 694 case Some(block) => 695 return Left(block) 696 case _ => 697 // Need to compute the block. 698 } 699 // Initially we hold no locks on this block. 700 doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { 701 case None => 702 // doPut() didn't hand work back to us, so the block already existed or was successfully 703 // stored. Therefore, we now hold a read lock on the block. 704 val blockResult = getLocalValues(blockId).getOrElse { 705 // Since we held a read lock between the doPut() and get() calls, the block should not 706 // have been evicted, so get() not returning the block indicates some internal error. 707 releaseLock(blockId) 708 throw new SparkException(s"get() failed for block $blockId even though we held a lock") 709 } 710 // We already hold a read lock on the block from the doPut() call and getLocalValues() 711 // acquires the lock again, so we need to call releaseLock() here so that the net number 712 // of lock acquisitions is 1 (since the caller will only call release() once). 713 releaseLock(blockId) 714 Left(blockResult) 715 case Some(iter) => 716 // The put failed, likely because the data was too large to fit in memory and could not be 717 // dropped to disk. Therefore, we need to pass the input iterator back to the caller so 718 // that they can decide what to do with the values (e.g. process them without caching). 719 Right(iter) 720 } 721 } 722 723 /** 724 * @return true if the block was stored or false if an error occurred. 725 */ 726 def putIterator[T: ClassTag]( 727 blockId: BlockId, 728 values: Iterator[T], 729 level: StorageLevel, 730 tellMaster: Boolean = true): Boolean = { 731 require(values != null, "Values is null") 732 doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match { 733 case None => 734 true 735 case Some(iter) => 736 // Caller doesn't care about the iterator values, so we can close the iterator here 737 // to free resources earlier 738 iter.close() 739 false 740 } 741 } 742 743 /** 744 * A short circuited method to get a block writer that can write data directly to disk. 745 * The Block will be appended to the File specified by filename. Callers should handle error 746 * cases. 747 */ 748 def getDiskWriter( 749 blockId: BlockId, 750 file: File, 751 serializerInstance: SerializerInstance, 752 bufferSize: Int, 753 writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = { 754 val syncWrites = conf.getBoolean("spark.shuffle.sync", false) 755 new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize, 756 syncWrites, writeMetrics, blockId) 757 } 758 759 /** 760 * Put a new block of serialized bytes to the block manager. 761 * 762 * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing 763 * so may corrupt or change the data stored by the `BlockManager`. 764 * 765 * @param encrypt If true, asks the block manager to encrypt the data block before storing, 766 * when I/O encryption is enabled. This is required for blocks that have been 767 * read from unencrypted sources, since all the BlockManager read APIs 768 * automatically do decryption. 769 * @return true if the block was stored or false if an error occurred. 770 */ 771 def putBytes[T: ClassTag]( 772 blockId: BlockId, 773 bytes: ChunkedByteBuffer, 774 level: StorageLevel, 775 tellMaster: Boolean = true, 776 encrypt: Boolean = false): Boolean = { 777 require(bytes != null, "Bytes is null") 778 779 val bytesToStore = 780 if (encrypt && securityManager.ioEncryptionKey.isDefined) { 781 try { 782 val data = bytes.toByteBuffer 783 val in = new ByteBufferInputStream(data) 784 val byteBufOut = new ByteBufferOutputStream(data.remaining()) 785 val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf, 786 securityManager.ioEncryptionKey.get) 787 try { 788 ByteStreams.copy(in, out) 789 } finally { 790 in.close() 791 out.close() 792 } 793 new ChunkedByteBuffer(byteBufOut.toByteBuffer) 794 } finally { 795 bytes.dispose() 796 } 797 } else { 798 bytes 799 } 800 801 doPutBytes(blockId, bytesToStore, level, implicitly[ClassTag[T]], tellMaster) 802 } 803 804 /** 805 * Put the given bytes according to the given level in one of the block stores, replicating 806 * the values if necessary. 807 * 808 * If the block already exists, this method will not overwrite it. 809 * 810 * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing 811 * so may corrupt or change the data stored by the `BlockManager`. 812 * 813 * @param keepReadLock if true, this method will hold the read lock when it returns (even if the 814 * block already exists). If false, this method will hold no locks when it 815 * returns. 816 * @return true if the block was already present or if the put succeeded, false otherwise. 817 */ 818 private def doPutBytes[T]( 819 blockId: BlockId, 820 bytes: ChunkedByteBuffer, 821 level: StorageLevel, 822 classTag: ClassTag[T], 823 tellMaster: Boolean = true, 824 keepReadLock: Boolean = false): Boolean = { 825 doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => 826 val startTimeMs = System.currentTimeMillis 827 // Since we're storing bytes, initiate the replication before storing them locally. 828 // This is faster as data is already serialized and ready to send. 829 val replicationFuture = if (level.replication > 1) { 830 Future { 831 // This is a blocking action and should run in futureExecutionContext which is a cached 832 // thread pool 833 replicate(blockId, bytes, level, classTag) 834 }(futureExecutionContext) 835 } else { 836 null 837 } 838 839 val size = bytes.size 840 841 if (level.useMemory) { 842 // Put it in memory first, even if it also has useDisk set to true; 843 // We will drop it to disk later if the memory store can't hold it. 844 val putSucceeded = if (level.deserialized) { 845 val values = 846 serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) 847 memoryStore.putIteratorAsValues(blockId, values, classTag) match { 848 case Right(_) => true 849 case Left(iter) => 850 // If putting deserialized values in memory failed, we will put the bytes directly to 851 // disk, so we don't need this iterator and can close it to free resources earlier. 852 iter.close() 853 false 854 } 855 } else { 856 val memoryMode = level.memoryMode 857 memoryStore.putBytes(blockId, size, memoryMode, () => { 858 if (memoryMode == MemoryMode.OFF_HEAP && 859 bytes.chunks.exists(buffer => !buffer.isDirect)) { 860 bytes.copy(Platform.allocateDirectBuffer) 861 } else { 862 bytes 863 } 864 }) 865 } 866 if (!putSucceeded && level.useDisk) { 867 logWarning(s"Persisting block $blockId to disk instead.") 868 diskStore.putBytes(blockId, bytes) 869 } 870 } else if (level.useDisk) { 871 diskStore.putBytes(blockId, bytes) 872 } 873 874 val putBlockStatus = getCurrentBlockStatus(blockId, info) 875 val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid 876 if (blockWasSuccessfullyStored) { 877 // Now that the block is in either the memory or disk store, 878 // tell the master about it. 879 info.size = size 880 if (tellMaster && info.tellMaster) { 881 reportBlockStatus(blockId, putBlockStatus) 882 } 883 addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) 884 } 885 logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) 886 if (level.replication > 1) { 887 // Wait for asynchronous replication to finish 888 try { 889 Await.ready(replicationFuture, Duration.Inf) 890 } catch { 891 case NonFatal(t) => 892 throw new Exception("Error occurred while waiting for replication to finish", t) 893 } 894 } 895 if (blockWasSuccessfullyStored) { 896 None 897 } else { 898 Some(bytes) 899 } 900 }.isEmpty 901 } 902 903 /** 904 * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]]. 905 * 906 * @param putBody a function which attempts the actual put() and returns None on success 907 * or Some on failure. 908 */ 909 private def doPut[T]( 910 blockId: BlockId, 911 level: StorageLevel, 912 classTag: ClassTag[_], 913 tellMaster: Boolean, 914 keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { 915 916 require(blockId != null, "BlockId is null") 917 require(level != null && level.isValid, "StorageLevel is null or invalid") 918 919 val putBlockInfo = { 920 val newInfo = new BlockInfo(level, classTag, tellMaster) 921 if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { 922 newInfo 923 } else { 924 logWarning(s"Block $blockId already exists on this machine; not re-adding it") 925 if (!keepReadLock) { 926 // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: 927 releaseLock(blockId) 928 } 929 return None 930 } 931 } 932 933 val startTimeMs = System.currentTimeMillis 934 var exceptionWasThrown: Boolean = true 935 val result: Option[T] = try { 936 val res = putBody(putBlockInfo) 937 exceptionWasThrown = false 938 if (res.isEmpty) { 939 // the block was successfully stored 940 if (keepReadLock) { 941 blockInfoManager.downgradeLock(blockId) 942 } else { 943 blockInfoManager.unlock(blockId) 944 } 945 } else { 946 removeBlockInternal(blockId, tellMaster = false) 947 logWarning(s"Putting block $blockId failed") 948 } 949 res 950 } finally { 951 // This cleanup is performed in a finally block rather than a `catch` to avoid having to 952 // catch and properly re-throw InterruptedException. 953 if (exceptionWasThrown) { 954 logWarning(s"Putting block $blockId failed due to an exception") 955 // If an exception was thrown then it's possible that the code in `putBody` has already 956 // notified the master about the availability of this block, so we need to send an update 957 // to remove this block location. 958 removeBlockInternal(blockId, tellMaster = tellMaster) 959 // The `putBody` code may have also added a new block status to TaskMetrics, so we need 960 // to cancel that out by overwriting it with an empty block status. We only do this if 961 // the finally block was entered via an exception because doing this unconditionally would 962 // cause us to send empty block statuses for every block that failed to be cached due to 963 // a memory shortage (which is an expected failure, unlike an uncaught exception). 964 addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) 965 } 966 } 967 if (level.replication > 1) { 968 logDebug("Putting block %s with replication took %s" 969 .format(blockId, Utils.getUsedTimeMs(startTimeMs))) 970 } else { 971 logDebug("Putting block %s without replication took %s" 972 .format(blockId, Utils.getUsedTimeMs(startTimeMs))) 973 } 974 result 975 } 976 977 /** 978 * Put the given block according to the given level in one of the block stores, replicating 979 * the values if necessary. 980 * 981 * If the block already exists, this method will not overwrite it. 982 * 983 * @param keepReadLock if true, this method will hold the read lock when it returns (even if the 984 * block already exists). If false, this method will hold no locks when it 985 * returns. 986 * @return None if the block was already present or if the put succeeded, or Some(iterator) 987 * if the put failed. 988 */ 989 private def doPutIterator[T]( 990 blockId: BlockId, 991 iterator: () => Iterator[T], 992 level: StorageLevel, 993 classTag: ClassTag[T], 994 tellMaster: Boolean = true, 995 keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { 996 doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => 997 val startTimeMs = System.currentTimeMillis 998 var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None 999 // Size of the block in bytes 1000 var size = 0L 1001 if (level.useMemory) { 1002 // Put it in memory first, even if it also has useDisk set to true; 1003 // We will drop it to disk later if the memory store can't hold it. 1004 if (level.deserialized) { 1005 memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match { 1006 case Right(s) => 1007 size = s 1008 case Left(iter) => 1009 // Not enough space to unroll this block; drop to disk if applicable 1010 if (level.useDisk) { 1011 logWarning(s"Persisting block $blockId to disk instead.") 1012 diskStore.put(blockId) { fileOutputStream => 1013 serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag) 1014 } 1015 size = diskStore.getSize(blockId) 1016 } else { 1017 iteratorFromFailedMemoryStorePut = Some(iter) 1018 } 1019 } 1020 } else { // !level.deserialized 1021 memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { 1022 case Right(s) => 1023 size = s 1024 case Left(partiallySerializedValues) => 1025 // Not enough space to unroll this block; drop to disk if applicable 1026 if (level.useDisk) { 1027 logWarning(s"Persisting block $blockId to disk instead.") 1028 diskStore.put(blockId) { fileOutputStream => 1029 partiallySerializedValues.finishWritingToStream(fileOutputStream) 1030 } 1031 size = diskStore.getSize(blockId) 1032 } else { 1033 iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator) 1034 } 1035 } 1036 } 1037 1038 } else if (level.useDisk) { 1039 diskStore.put(blockId) { fileOutputStream => 1040 serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag) 1041 } 1042 size = diskStore.getSize(blockId) 1043 } 1044 1045 val putBlockStatus = getCurrentBlockStatus(blockId, info) 1046 val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid 1047 if (blockWasSuccessfullyStored) { 1048 // Now that the block is in either the memory or disk store, tell the master about it. 1049 info.size = size 1050 if (tellMaster && info.tellMaster) { 1051 reportBlockStatus(blockId, putBlockStatus) 1052 } 1053 addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) 1054 logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) 1055 if (level.replication > 1) { 1056 val remoteStartTime = System.currentTimeMillis 1057 val bytesToReplicate = doGetLocalBytes(blockId, info) 1058 // [SPARK-16550] Erase the typed classTag when using default serialization, since 1059 // NettyBlockRpcServer crashes when deserializing repl-defined classes. 1060 // TODO(ekl) remove this once the classloader issue on the remote end is fixed. 1061 val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { 1062 scala.reflect.classTag[Any] 1063 } else { 1064 classTag 1065 } 1066 try { 1067 replicate(blockId, bytesToReplicate, level, remoteClassTag) 1068 } finally { 1069 bytesToReplicate.unmap() 1070 } 1071 logDebug("Put block %s remotely took %s" 1072 .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) 1073 } 1074 } 1075 assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty) 1076 iteratorFromFailedMemoryStorePut 1077 } 1078 } 1079 1080 /** 1081 * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up 1082 * subsequent reads. This method requires the caller to hold a read lock on the block. 1083 * 1084 * @return a copy of the bytes from the memory store if the put succeeded, otherwise None. 1085 * If this returns bytes from the memory store then the original disk store bytes will 1086 * automatically be disposed and the caller should not continue to use them. Otherwise, 1087 * if this returns None then the original disk store bytes will be unaffected. 1088 */ 1089 private def maybeCacheDiskBytesInMemory( 1090 blockInfo: BlockInfo, 1091 blockId: BlockId, 1092 level: StorageLevel, 1093 diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = { 1094 require(!level.deserialized) 1095 if (level.useMemory) { 1096 // Synchronize on blockInfo to guard against a race condition where two readers both try to 1097 // put values read from disk into the MemoryStore. 1098 blockInfo.synchronized { 1099 if (memoryStore.contains(blockId)) { 1100 diskBytes.dispose() 1101 Some(memoryStore.getBytes(blockId).get) 1102 } else { 1103 val allocator = level.memoryMode match { 1104 case MemoryMode.ON_HEAP => ByteBuffer.allocate _ 1105 case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ 1106 } 1107 val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => { 1108 // https://issues.apache.org/jira/browse/SPARK-6076 1109 // If the file size is bigger than the free memory, OOM will happen. So if we 1110 // cannot put it into MemoryStore, copyForMemory should not be created. That's why 1111 // this action is put into a `() => ChunkedByteBuffer` and created lazily. 1112 diskBytes.copy(allocator) 1113 }) 1114 if (putSucceeded) { 1115 diskBytes.dispose() 1116 Some(memoryStore.getBytes(blockId).get) 1117 } else { 1118 None 1119 } 1120 } 1121 } 1122 } else { 1123 None 1124 } 1125 } 1126 1127 /** 1128 * Attempts to cache spilled values read from disk into the MemoryStore in order to speed up 1129 * subsequent reads. This method requires the caller to hold a read lock on the block. 1130 * 1131 * @return a copy of the iterator. The original iterator passed this method should no longer 1132 * be used after this method returns. 1133 */ 1134 private def maybeCacheDiskValuesInMemory[T]( 1135 blockInfo: BlockInfo, 1136 blockId: BlockId, 1137 level: StorageLevel, 1138 diskIterator: Iterator[T]): Iterator[T] = { 1139 require(level.deserialized) 1140 val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]] 1141 if (level.useMemory) { 1142 // Synchronize on blockInfo to guard against a race condition where two readers both try to 1143 // put values read from disk into the MemoryStore. 1144 blockInfo.synchronized { 1145 if (memoryStore.contains(blockId)) { 1146 // Note: if we had a means to discard the disk iterator, we would do that here. 1147 memoryStore.getValues(blockId).get 1148 } else { 1149 memoryStore.putIteratorAsValues(blockId, diskIterator, classTag) match { 1150 case Left(iter) => 1151 // The memory store put() failed, so it returned the iterator back to us: 1152 iter 1153 case Right(_) => 1154 // The put() succeeded, so we can read the values back: 1155 memoryStore.getValues(blockId).get 1156 } 1157 } 1158 }.asInstanceOf[Iterator[T]] 1159 } else { 1160 diskIterator 1161 } 1162 } 1163 1164 /** 1165 * Get peer block managers in the system. 1166 */ 1167 private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { 1168 peerFetchLock.synchronized { 1169 val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds 1170 val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl 1171 if (cachedPeers == null || forceFetch || timeout) { 1172 cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) 1173 lastPeerFetchTime = System.currentTimeMillis 1174 logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) 1175 } 1176 cachedPeers 1177 } 1178 } 1179 1180 /** 1181 * Replicate block to another node. Note that this is a blocking call that returns after 1182 * the block has been replicated. 1183 */ 1184 private def replicate( 1185 blockId: BlockId, 1186 data: ChunkedByteBuffer, 1187 level: StorageLevel, 1188 classTag: ClassTag[_]): Unit = { 1189 1190 val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) 1191 val tLevel = StorageLevel( 1192 useDisk = level.useDisk, 1193 useMemory = level.useMemory, 1194 useOffHeap = level.useOffHeap, 1195 deserialized = level.deserialized, 1196 replication = 1) 1197 1198 val numPeersToReplicateTo = level.replication - 1 1199 1200 val startTime = System.nanoTime 1201 1202 var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId] 1203 var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] 1204 var numFailures = 0 1205 1206 var peersForReplication = blockReplicationPolicy.prioritize( 1207 blockManagerId, 1208 getPeers(false), 1209 mutable.HashSet.empty, 1210 blockId, 1211 numPeersToReplicateTo) 1212 1213 while(numFailures <= maxReplicationFailures && 1214 !peersForReplication.isEmpty && 1215 peersReplicatedTo.size != numPeersToReplicateTo) { 1216 val peer = peersForReplication.head 1217 try { 1218 val onePeerStartTime = System.nanoTime 1219 logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") 1220 blockTransferService.uploadBlockSync( 1221 peer.host, 1222 peer.port, 1223 peer.executorId, 1224 blockId, 1225 new NettyManagedBuffer(data.toNetty), 1226 tLevel, 1227 classTag) 1228 logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + 1229 s" in ${(System.nanoTime - onePeerStartTime).toDouble / 1e6} ms") 1230 peersForReplication = peersForReplication.tail 1231 peersReplicatedTo += peer 1232 } catch { 1233 case NonFatal(e) => 1234 logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) 1235 peersFailedToReplicateTo += peer 1236 // we have a failed replication, so we get the list of peers again 1237 // we don't want peers we have already replicated to and the ones that 1238 // have failed previously 1239 val filteredPeers = getPeers(true).filter { p => 1240 !peersFailedToReplicateTo.contains(p) && !peersReplicatedTo.contains(p) 1241 } 1242 1243 numFailures += 1 1244 peersForReplication = blockReplicationPolicy.prioritize( 1245 blockManagerId, 1246 filteredPeers, 1247 peersReplicatedTo, 1248 blockId, 1249 numPeersToReplicateTo - peersReplicatedTo.size) 1250 } 1251 } 1252 1253 logDebug(s"Replicating $blockId of ${data.size} bytes to " + 1254 s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms") 1255 if (peersReplicatedTo.size < numPeersToReplicateTo) { 1256 logWarning(s"Block $blockId replicated to only " + 1257 s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") 1258 } 1259 1260 logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") 1261 } 1262 1263 /** 1264 * Read a block consisting of a single object. 1265 */ 1266 def getSingle[T: ClassTag](blockId: BlockId): Option[T] = { 1267 get[T](blockId).map(_.data.next().asInstanceOf[T]) 1268 } 1269 1270 /** 1271 * Write a block consisting of a single object. 1272 * 1273 * @return true if the block was stored or false if the block was already stored or an 1274 * error occurred. 1275 */ 1276 def putSingle[T: ClassTag]( 1277 blockId: BlockId, 1278 value: T, 1279 level: StorageLevel, 1280 tellMaster: Boolean = true): Boolean = { 1281 putIterator(blockId, Iterator(value), level, tellMaster) 1282 } 1283 1284 /** 1285 * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory 1286 * store reaches its limit and needs to free up space. 1287 * 1288 * If `data` is not put on disk, it won't be created. 1289 * 1290 * The caller of this method must hold a write lock on the block before calling this method. 1291 * This method does not release the write lock. 1292 * 1293 * @return the block's new effective StorageLevel. 1294 */ 1295 private[storage] override def dropFromMemory[T: ClassTag]( 1296 blockId: BlockId, 1297 data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { 1298 logInfo(s"Dropping block $blockId from memory") 1299 val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) 1300 var blockIsUpdated = false 1301 val level = info.level 1302 1303 // Drop to disk, if storage level requires 1304 if (level.useDisk && !diskStore.contains(blockId)) { 1305 logInfo(s"Writing block $blockId to disk") 1306 data() match { 1307 case Left(elements) => 1308 diskStore.put(blockId) { fileOutputStream => 1309 serializerManager.dataSerializeStream( 1310 blockId, 1311 fileOutputStream, 1312 elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) 1313 } 1314 case Right(bytes) => 1315 diskStore.putBytes(blockId, bytes) 1316 } 1317 blockIsUpdated = true 1318 } 1319 1320 // Actually drop from memory store 1321 val droppedMemorySize = 1322 if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L 1323 val blockIsRemoved = memoryStore.remove(blockId) 1324 if (blockIsRemoved) { 1325 blockIsUpdated = true 1326 } else { 1327 logWarning(s"Block $blockId could not be dropped from memory as it does not exist") 1328 } 1329 1330 val status = getCurrentBlockStatus(blockId, info) 1331 if (info.tellMaster) { 1332 reportBlockStatus(blockId, status, droppedMemorySize) 1333 } 1334 if (blockIsUpdated) { 1335 addUpdatedBlockStatusToTaskMetrics(blockId, status) 1336 } 1337 status.storageLevel 1338 } 1339 1340 /** 1341 * Remove all blocks belonging to the given RDD. 1342 * 1343 * @return The number of blocks removed. 1344 */ 1345 def removeRdd(rddId: Int): Int = { 1346 // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. 1347 logInfo(s"Removing RDD $rddId") 1348 val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) 1349 blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } 1350 blocksToRemove.size 1351 } 1352 1353 /** 1354 * Remove all blocks belonging to the given broadcast. 1355 */ 1356 def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { 1357 logDebug(s"Removing broadcast $broadcastId") 1358 val blocksToRemove = blockInfoManager.entries.map(_._1).collect { 1359 case bid @ BroadcastBlockId(`broadcastId`, _) => bid 1360 } 1361 blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) } 1362 blocksToRemove.size 1363 } 1364 1365 /** 1366 * Remove a block from both memory and disk. 1367 */ 1368 def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { 1369 logDebug(s"Removing block $blockId") 1370 blockInfoManager.lockForWriting(blockId) match { 1371 case None => 1372 // The block has already been removed; do nothing. 1373 logWarning(s"Asked to remove block $blockId, which does not exist") 1374 case Some(info) => 1375 removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster) 1376 addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) 1377 } 1378 } 1379 1380 /** 1381 * Internal version of [[removeBlock()]] which assumes that the caller already holds a write 1382 * lock on the block. 1383 */ 1384 private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = { 1385 // Removals are idempotent in disk store and memory store. At worst, we get a warning. 1386 val removedFromMemory = memoryStore.remove(blockId) 1387 val removedFromDisk = diskStore.remove(blockId) 1388 if (!removedFromMemory && !removedFromDisk) { 1389 logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") 1390 } 1391 blockInfoManager.removeBlock(blockId) 1392 if (tellMaster) { 1393 reportBlockStatus(blockId, BlockStatus.empty) 1394 } 1395 } 1396 1397 private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = { 1398 Option(TaskContext.get()).foreach { c => 1399 c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) 1400 } 1401 } 1402 1403 def stop(): Unit = { 1404 blockTransferService.close() 1405 if (shuffleClient ne blockTransferService) { 1406 // Closing should be idempotent, but maybe not for the NioBlockTransferService. 1407 shuffleClient.close() 1408 } 1409 diskBlockManager.stop() 1410 rpcEnv.stop(slaveEndpoint) 1411 blockInfoManager.clear() 1412 memoryStore.clear() 1413 futureExecutionContext.shutdownNow() 1414 logInfo("BlockManager stopped") 1415 } 1416} 1417 1418 1419private[spark] object BlockManager { 1420 private val ID_GENERATOR = new IdGenerator 1421 1422 def blockIdsToHosts( 1423 blockIds: Array[BlockId], 1424 env: SparkEnv, 1425 blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { 1426 1427 // blockManagerMaster != null is used in tests 1428 assert(env != null || blockManagerMaster != null) 1429 val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { 1430 env.blockManager.getLocationBlockIds(blockIds) 1431 } else { 1432 blockManagerMaster.getLocations(blockIds) 1433 } 1434 1435 val blockManagers = new HashMap[BlockId, Seq[String]] 1436 for (i <- 0 until blockIds.length) { 1437 blockManagers(blockIds(i)) = blockLocations(i).map(_.host) 1438 } 1439 blockManagers.toMap 1440 } 1441} 1442