1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.storage
19
20import java.io._
21import java.nio.ByteBuffer
22
23import scala.collection.mutable
24import scala.collection.mutable.HashMap
25import scala.concurrent.{Await, ExecutionContext, Future}
26import scala.concurrent.duration._
27import scala.reflect.ClassTag
28import scala.util.Random
29import scala.util.control.NonFatal
30
31import com.google.common.io.ByteStreams
32
33import org.apache.spark._
34import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
35import org.apache.spark.internal.Logging
36import org.apache.spark.memory.{MemoryManager, MemoryMode}
37import org.apache.spark.network._
38import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
39import org.apache.spark.network.netty.SparkTransportConf
40import org.apache.spark.network.shuffle.ExternalShuffleClient
41import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
42import org.apache.spark.rpc.RpcEnv
43import org.apache.spark.security.CryptoStreamUtils
44import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
45import org.apache.spark.shuffle.ShuffleManager
46import org.apache.spark.storage.memory._
47import org.apache.spark.unsafe.Platform
48import org.apache.spark.util._
49import org.apache.spark.util.io.ChunkedByteBuffer
50
51
52/* Class for returning a fetched block and associated metrics. */
53private[spark] class BlockResult(
54    val data: Iterator[Any],
55    val readMethod: DataReadMethod.Value,
56    val bytes: Long)
57
58/**
59 * Manager running on every node (driver and executors) which provides interfaces for putting and
60 * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
61 *
62 * Note that [[initialize()]] must be called before the BlockManager is usable.
63 */
64private[spark] class BlockManager(
65    executorId: String,
66    rpcEnv: RpcEnv,
67    val master: BlockManagerMaster,
68    val serializerManager: SerializerManager,
69    val conf: SparkConf,
70    memoryManager: MemoryManager,
71    mapOutputTracker: MapOutputTracker,
72    shuffleManager: ShuffleManager,
73    val blockTransferService: BlockTransferService,
74    securityManager: SecurityManager,
75    numUsableCores: Int)
76  extends BlockDataManager with BlockEvictionHandler with Logging {
77
78  private[spark] val externalShuffleServiceEnabled =
79    conf.getBoolean("spark.shuffle.service.enabled", false)
80
81  val diskBlockManager = {
82    // Only perform cleanup if an external service is not serving our shuffle files.
83    val deleteFilesOnStop =
84      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
85    new DiskBlockManager(conf, deleteFilesOnStop)
86  }
87
88  // Visible for testing
89  private[storage] val blockInfoManager = new BlockInfoManager
90
91  private val futureExecutionContext = ExecutionContext.fromExecutorService(
92    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
93
94  // Actual storage of where blocks are kept
95  private[spark] val memoryStore =
96    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
97  private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
98  memoryManager.setMemoryStore(memoryStore)
99
100  // Note: depending on the memory manager, `maxMemory` may actually vary over time.
101  // However, since we use this only for reporting and logging, what we actually want here is
102  // the absolute maximum value that `maxMemory` can ever possibly reach. We may need
103  // to revisit whether reporting this value as the "max" is intuitive to the user.
104  private val maxMemory =
105    memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
106
107  // Port used by the external shuffle service. In Yarn mode, this may be already be
108  // set through the Hadoop configuration as the server is launched in the Yarn NM.
109  private val externalShuffleServicePort = {
110    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
111    if (tmpPort == 0) {
112      // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
113      // an open port.  But we still need to tell our spark apps the right port to use.  So
114      // only if the yarn config has the port set to 0, we prefer the value in the spark config
115      conf.get("spark.shuffle.service.port").toInt
116    } else {
117      tmpPort
118    }
119  }
120
121  var blockManagerId: BlockManagerId = _
122
123  // Address of the server that serves this executor's shuffle files. This is either an external
124  // service, or just our own Executor's BlockManager.
125  private[spark] var shuffleServerId: BlockManagerId = _
126
127  // Client to read other executors' shuffle files. This is either an external service, or just the
128  // standard BlockTransferService to directly connect to other Executors.
129  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
130    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
131    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
132      securityManager.isSaslEncryptionEnabled())
133  } else {
134    blockTransferService
135  }
136
137  // Max number of failures before this block manager refreshes the block locations from the driver
138  private val maxFailuresBeforeLocationRefresh =
139    conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
140
141  private val slaveEndpoint = rpcEnv.setupEndpoint(
142    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
143    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
144
145  // Pending re-registration action being executed asynchronously or null if none is pending.
146  // Accesses should synchronize on asyncReregisterLock.
147  private var asyncReregisterTask: Future[Unit] = null
148  private val asyncReregisterLock = new Object
149
150  // Field related to peer block managers that are necessary for block replication
151  @volatile private var cachedPeers: Seq[BlockManagerId] = _
152  private val peerFetchLock = new Object
153  private var lastPeerFetchTime = 0L
154
155  private var blockReplicationPolicy: BlockReplicationPolicy = _
156
157  /**
158   * Initializes the BlockManager with the given appId. This is not performed in the constructor as
159   * the appId may not be known at BlockManager instantiation time (in particular for the driver,
160   * where it is only learned after registration with the TaskScheduler).
161   *
162   * This method initializes the BlockTransferService and ShuffleClient, registers with the
163   * BlockManagerMaster, starts the BlockManagerWorker endpoint, and registers with a local shuffle
164   * service if configured.
165   */
166  def initialize(appId: String): Unit = {
167    blockTransferService.init(this)
168    shuffleClient.init(appId)
169
170    blockReplicationPolicy = {
171      val priorityClass = conf.get(
172        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
173      val clazz = Utils.classForName(priorityClass)
174      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
175      logInfo(s"Using $priorityClass for block replication policy")
176      ret
177    }
178
179    val id =
180      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
181
182    val idFromMaster = master.registerBlockManager(
183      id,
184      maxMemory,
185      slaveEndpoint)
186
187    blockManagerId = if (idFromMaster != null) idFromMaster else id
188
189    shuffleServerId = if (externalShuffleServiceEnabled) {
190      logInfo(s"external shuffle service port = $externalShuffleServicePort")
191      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
192    } else {
193      blockManagerId
194    }
195
196    // Register Executors' configuration with the local shuffle service, if one should exist.
197    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
198      registerWithExternalShuffleServer()
199    }
200
201    logInfo(s"Initialized BlockManager: $blockManagerId")
202  }
203
204  private def registerWithExternalShuffleServer() {
205    logInfo("Registering executor with local external shuffle service.")
206    val shuffleConfig = new ExecutorShuffleInfo(
207      diskBlockManager.localDirs.map(_.toString),
208      diskBlockManager.subDirsPerLocalDir,
209      shuffleManager.getClass.getName)
210
211    val MAX_ATTEMPTS = 3
212    val SLEEP_TIME_SECS = 5
213
214    for (i <- 1 to MAX_ATTEMPTS) {
215      try {
216        // Synchronous and will throw an exception if we cannot connect.
217        shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
218          shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
219        return
220      } catch {
221        case e: Exception if i < MAX_ATTEMPTS =>
222          logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
223            + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
224          Thread.sleep(SLEEP_TIME_SECS * 1000)
225        case NonFatal(e) =>
226          throw new SparkException("Unable to register with external shuffle server due to : " +
227            e.getMessage, e)
228      }
229    }
230  }
231
232  /**
233   * Report all blocks to the BlockManager again. This may be necessary if we are dropped
234   * by the BlockManager and come back or if we become capable of recovering blocks on disk after
235   * an executor crash.
236   *
237   * This function deliberately fails silently if the master returns false (indicating that
238   * the slave needs to re-register). The error condition will be detected again by the next
239   * heart beat attempt or new block registration and another try to re-register all blocks
240   * will be made then.
241   */
242  private def reportAllBlocks(): Unit = {
243    logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
244    for ((blockId, info) <- blockInfoManager.entries) {
245      val status = getCurrentBlockStatus(blockId, info)
246      if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
247        logError(s"Failed to report $blockId to master; giving up.")
248        return
249      }
250    }
251  }
252
253  /**
254   * Re-register with the master and report all blocks to it. This will be called by the heart beat
255   * thread if our heartbeat to the block manager indicates that we were not registered.
256   *
257   * Note that this method must be called without any BlockInfo locks held.
258   */
259  def reregister(): Unit = {
260    // TODO: We might need to rate limit re-registering.
261    logInfo(s"BlockManager $blockManagerId re-registering with master")
262    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
263    reportAllBlocks()
264  }
265
266  /**
267   * Re-register with the master sometime soon.
268   */
269  private def asyncReregister(): Unit = {
270    asyncReregisterLock.synchronized {
271      if (asyncReregisterTask == null) {
272        asyncReregisterTask = Future[Unit] {
273          // This is a blocking action and should run in futureExecutionContext which is a cached
274          // thread pool
275          reregister()
276          asyncReregisterLock.synchronized {
277            asyncReregisterTask = null
278          }
279        }(futureExecutionContext)
280      }
281    }
282  }
283
284  /**
285   * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing.
286   */
287  def waitForAsyncReregister(): Unit = {
288    val task = asyncReregisterTask
289    if (task != null) {
290      try {
291        Await.ready(task, Duration.Inf)
292      } catch {
293        case NonFatal(t) =>
294          throw new Exception("Error occurred while waiting for async. reregistration", t)
295      }
296    }
297  }
298
299  /**
300   * Interface to get local block data. Throws an exception if the block cannot be found or
301   * cannot be read successfully.
302   */
303  override def getBlockData(blockId: BlockId): ManagedBuffer = {
304    if (blockId.isShuffle) {
305      shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
306    } else {
307      getLocalBytes(blockId) match {
308        case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer)
309        case None =>
310          // If this block manager receives a request for a block that it doesn't have then it's
311          // likely that the master has outdated block statuses for this block. Therefore, we send
312          // an RPC so that this block is marked as being unavailable from this block manager.
313          reportBlockStatus(blockId, BlockStatus.empty)
314          throw new BlockNotFoundException(blockId.toString)
315      }
316    }
317  }
318
319  /**
320   * Put the block locally, using the given storage level.
321   *
322   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
323   * so may corrupt or change the data stored by the `BlockManager`.
324   */
325  override def putBlockData(
326      blockId: BlockId,
327      data: ManagedBuffer,
328      level: StorageLevel,
329      classTag: ClassTag[_]): Boolean = {
330    putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
331  }
332
333  /**
334   * Get the BlockStatus for the block identified by the given ID, if it exists.
335   * NOTE: This is mainly for testing.
336   */
337  def getStatus(blockId: BlockId): Option[BlockStatus] = {
338    blockInfoManager.get(blockId).map { info =>
339      val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
340      val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
341      BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
342    }
343  }
344
345  /**
346   * Get the ids of existing blocks that match the given filter. Note that this will
347   * query the blocks stored in the disk block manager (that the block manager
348   * may not know of).
349   */
350  def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
351    // The `toArray` is necessary here in order to force the list to be materialized so that we
352    // don't try to serialize a lazy iterator when responding to client requests.
353    (blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks())
354      .filter(filter)
355      .toArray
356      .toSeq
357  }
358
359  /**
360   * Tell the master about the current storage status of a block. This will send a block update
361   * message reflecting the current status, *not* the desired storage level in its block info.
362   * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
363   *
364   * droppedMemorySize exists to account for when the block is dropped from memory to disk (so
365   * it is still valid). This ensures that update in master will compensate for the increase in
366   * memory on slave.
367   */
368  private def reportBlockStatus(
369      blockId: BlockId,
370      status: BlockStatus,
371      droppedMemorySize: Long = 0L): Unit = {
372    val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
373    if (needReregister) {
374      logInfo(s"Got told to re-register updating block $blockId")
375      // Re-registering will report our new block for free.
376      asyncReregister()
377    }
378    logDebug(s"Told master about block $blockId")
379  }
380
381  /**
382   * Actually send a UpdateBlockInfo message. Returns the master's response,
383   * which will be true if the block was successfully recorded and false if
384   * the slave needs to re-register.
385   */
386  private def tryToReportBlockStatus(
387      blockId: BlockId,
388      status: BlockStatus,
389      droppedMemorySize: Long = 0L): Boolean = {
390    val storageLevel = status.storageLevel
391    val inMemSize = Math.max(status.memSize, droppedMemorySize)
392    val onDiskSize = status.diskSize
393    master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
394  }
395
396  /**
397   * Return the updated storage status of the block with the given ID. More specifically, if
398   * the block is dropped from memory and possibly added to disk, return the new storage level
399   * and the updated in-memory and on-disk sizes.
400   */
401  private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
402    info.synchronized {
403      info.level match {
404        case null =>
405          BlockStatus.empty
406        case level =>
407          val inMem = level.useMemory && memoryStore.contains(blockId)
408          val onDisk = level.useDisk && diskStore.contains(blockId)
409          val deserialized = if (inMem) level.deserialized else false
410          val replication = if (inMem  || onDisk) level.replication else 1
411          val storageLevel = StorageLevel(
412            useDisk = onDisk,
413            useMemory = inMem,
414            useOffHeap = level.useOffHeap,
415            deserialized = deserialized,
416            replication = replication)
417          val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
418          val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
419          BlockStatus(storageLevel, memSize, diskSize)
420      }
421    }
422  }
423
424  /**
425   * Get locations of an array of blocks.
426   */
427  private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
428    val startTimeMs = System.currentTimeMillis
429    val locations = master.getLocations(blockIds).toArray
430    logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs)))
431    locations
432  }
433
434  /**
435   * Cleanup code run in response to a failed local read.
436   * Must be called while holding a read lock on the block.
437   */
438  private def handleLocalReadFailure(blockId: BlockId): Nothing = {
439    releaseLock(blockId)
440    // Remove the missing block so that its unavailability is reported to the driver
441    removeBlock(blockId)
442    throw new SparkException(s"Block $blockId was not found even though it's read-locked")
443  }
444
445  /**
446   * Get block from local block manager as an iterator of Java objects.
447   */
448  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
449    logDebug(s"Getting local block $blockId")
450    blockInfoManager.lockForReading(blockId) match {
451      case None =>
452        logDebug(s"Block $blockId was not found")
453        None
454      case Some(info) =>
455        val level = info.level
456        logDebug(s"Level for block $blockId is $level")
457        if (level.useMemory && memoryStore.contains(blockId)) {
458          val iter: Iterator[Any] = if (level.deserialized) {
459            memoryStore.getValues(blockId).get
460          } else {
461            serializerManager.dataDeserializeStream(
462              blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
463          }
464          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
465          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
466        } else if (level.useDisk && diskStore.contains(blockId)) {
467          val iterToReturn: Iterator[Any] = {
468            val diskBytes = diskStore.getBytes(blockId)
469            if (level.deserialized) {
470              val diskValues = serializerManager.dataDeserializeStream(
471                blockId,
472                diskBytes.toInputStream(dispose = true))(info.classTag)
473              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
474            } else {
475              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
476                .map {_.toInputStream(dispose = false)}
477                .getOrElse { diskBytes.toInputStream(dispose = true) }
478              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
479            }
480          }
481          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
482          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
483        } else {
484          handleLocalReadFailure(blockId)
485        }
486    }
487  }
488
489  /**
490   * Get block from the local block manager as serialized bytes.
491   */
492  def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
493    logDebug(s"Getting local block $blockId as bytes")
494    // As an optimization for map output fetches, if the block is for a shuffle, return it
495    // without acquiring a lock; the disk store never deletes (recent) items so this should work
496    if (blockId.isShuffle) {
497      val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
498      // TODO: This should gracefully handle case where local block is not available. Currently
499      // downstream code will throw an exception.
500      Option(
501        new ChunkedByteBuffer(
502          shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
503    } else {
504      blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
505    }
506  }
507
508  /**
509   * Get block from the local block manager as serialized bytes.
510   *
511   * Must be called while holding a read lock on the block.
512   * Releases the read lock upon exception; keeps the read lock upon successful return.
513   */
514  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
515    val level = info.level
516    logDebug(s"Level for block $blockId is $level")
517    // In order, try to read the serialized bytes from memory, then from disk, then fall back to
518    // serializing in-memory objects, and, finally, throw an exception if the block does not exist.
519    if (level.deserialized) {
520      // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
521      if (level.useDisk && diskStore.contains(blockId)) {
522        // Note: we purposely do not try to put the block back into memory here. Since this branch
523        // handles deserialized blocks, this block may only be cached in memory as objects, not
524        // serialized bytes. Because the caller only requested bytes, it doesn't make sense to
525        // cache the block's deserialized objects since that caching may not have a payoff.
526        diskStore.getBytes(blockId)
527      } else if (level.useMemory && memoryStore.contains(blockId)) {
528        // The block was not found on disk, so serialize an in-memory copy:
529        serializerManager.dataSerializeWithExplicitClassTag(
530          blockId, memoryStore.getValues(blockId).get, info.classTag)
531      } else {
532        handleLocalReadFailure(blockId)
533      }
534    } else {  // storage level is serialized
535      if (level.useMemory && memoryStore.contains(blockId)) {
536        memoryStore.getBytes(blockId).get
537      } else if (level.useDisk && diskStore.contains(blockId)) {
538        val diskBytes = diskStore.getBytes(blockId)
539        maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
540      } else {
541        handleLocalReadFailure(blockId)
542      }
543    }
544  }
545
546  /**
547   * Get block from remote block managers.
548   *
549   * This does not acquire a lock on this block in this JVM.
550   */
551  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
552    val ct = implicitly[ClassTag[T]]
553    getRemoteBytes(blockId).map { data =>
554      val values =
555        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
556      new BlockResult(values, DataReadMethod.Network, data.size)
557    }
558  }
559
560  /**
561   * Return a list of locations for the given block, prioritizing the local machine since
562   * multiple block managers can share the same host.
563   */
564  private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
565    val locs = Random.shuffle(master.getLocations(blockId))
566    val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
567    preferredLocs ++ otherLocs
568  }
569
570  /**
571   * Get block from remote block managers as serialized bytes.
572   */
573  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
574    logDebug(s"Getting remote block $blockId")
575    require(blockId != null, "BlockId is null")
576    var runningFailureCount = 0
577    var totalFailureCount = 0
578    val locations = getLocations(blockId)
579    val maxFetchFailures = locations.size
580    var locationIterator = locations.iterator
581    while (locationIterator.hasNext) {
582      val loc = locationIterator.next()
583      logDebug(s"Getting remote block $blockId from $loc")
584      val data = try {
585        blockTransferService.fetchBlockSync(
586          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
587      } catch {
588        case NonFatal(e) =>
589          runningFailureCount += 1
590          totalFailureCount += 1
591
592          if (totalFailureCount >= maxFetchFailures) {
593            // Give up trying anymore locations. Either we've tried all of the original locations,
594            // or we've refreshed the list of locations from the master, and have still
595            // hit failures after trying locations from the refreshed list.
596            logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
597              s"Most recent failure cause:", e)
598            return None
599          }
600
601          logWarning(s"Failed to fetch remote block $blockId " +
602            s"from $loc (failed attempt $runningFailureCount)", e)
603
604          // If there is a large number of executors then locations list can contain a
605          // large number of stale entries causing a large number of retries that may
606          // take a significant amount of time. To get rid of these stale entries
607          // we refresh the block locations after a certain number of fetch failures
608          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
609            locationIterator = getLocations(blockId).iterator
610            logDebug(s"Refreshed locations from the driver " +
611              s"after ${runningFailureCount} fetch failures.")
612            runningFailureCount = 0
613          }
614
615          // This location failed, so we retry fetch from a different one by returning null here
616          null
617      }
618
619      if (data != null) {
620        return Some(new ChunkedByteBuffer(data))
621      }
622      logDebug(s"The value of block $blockId is null")
623    }
624    logDebug(s"Block $blockId not found")
625    None
626  }
627
628  /**
629   * Get a block from the block manager (either local or remote).
630   *
631   * This acquires a read lock on the block if the block was stored locally and does not acquire
632   * any locks if the block was fetched from a remote block manager. The read lock will
633   * automatically be freed once the result's `data` iterator is fully consumed.
634   */
635  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
636    val local = getLocalValues(blockId)
637    if (local.isDefined) {
638      logInfo(s"Found block $blockId locally")
639      return local
640    }
641    val remote = getRemoteValues[T](blockId)
642    if (remote.isDefined) {
643      logInfo(s"Found block $blockId remotely")
644      return remote
645    }
646    None
647  }
648
649  /**
650   * Downgrades an exclusive write lock to a shared read lock.
651   */
652  def downgradeLock(blockId: BlockId): Unit = {
653    blockInfoManager.downgradeLock(blockId)
654  }
655
656  /**
657   * Release a lock on the given block.
658   */
659  def releaseLock(blockId: BlockId): Unit = {
660    blockInfoManager.unlock(blockId)
661  }
662
663  /**
664   * Registers a task with the BlockManager in order to initialize per-task bookkeeping structures.
665   */
666  def registerTask(taskAttemptId: Long): Unit = {
667    blockInfoManager.registerTask(taskAttemptId)
668  }
669
670  /**
671   * Release all locks for the given task.
672   *
673   * @return the blocks whose locks were released.
674   */
675  def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = {
676    blockInfoManager.releaseAllLocksForTask(taskAttemptId)
677  }
678
679  /**
680   * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
681   * to compute the block, persist it, and return its values.
682   *
683   * @return either a BlockResult if the block was successfully cached, or an iterator if the block
684   *         could not be cached.
685   */
686  def getOrElseUpdate[T](
687      blockId: BlockId,
688      level: StorageLevel,
689      classTag: ClassTag[T],
690      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
691    // Attempt to read the block from local or remote storage. If it's present, then we don't need
692    // to go through the local-get-or-put path.
693    get[T](blockId)(classTag) match {
694      case Some(block) =>
695        return Left(block)
696      case _ =>
697        // Need to compute the block.
698    }
699    // Initially we hold no locks on this block.
700    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
701      case None =>
702        // doPut() didn't hand work back to us, so the block already existed or was successfully
703        // stored. Therefore, we now hold a read lock on the block.
704        val blockResult = getLocalValues(blockId).getOrElse {
705          // Since we held a read lock between the doPut() and get() calls, the block should not
706          // have been evicted, so get() not returning the block indicates some internal error.
707          releaseLock(blockId)
708          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
709        }
710        // We already hold a read lock on the block from the doPut() call and getLocalValues()
711        // acquires the lock again, so we need to call releaseLock() here so that the net number
712        // of lock acquisitions is 1 (since the caller will only call release() once).
713        releaseLock(blockId)
714        Left(blockResult)
715      case Some(iter) =>
716        // The put failed, likely because the data was too large to fit in memory and could not be
717        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
718        // that they can decide what to do with the values (e.g. process them without caching).
719       Right(iter)
720    }
721  }
722
723  /**
724   * @return true if the block was stored or false if an error occurred.
725   */
726  def putIterator[T: ClassTag](
727      blockId: BlockId,
728      values: Iterator[T],
729      level: StorageLevel,
730      tellMaster: Boolean = true): Boolean = {
731    require(values != null, "Values is null")
732    doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match {
733      case None =>
734        true
735      case Some(iter) =>
736        // Caller doesn't care about the iterator values, so we can close the iterator here
737        // to free resources earlier
738        iter.close()
739        false
740    }
741  }
742
743  /**
744   * A short circuited method to get a block writer that can write data directly to disk.
745   * The Block will be appended to the File specified by filename. Callers should handle error
746   * cases.
747   */
748  def getDiskWriter(
749      blockId: BlockId,
750      file: File,
751      serializerInstance: SerializerInstance,
752      bufferSize: Int,
753      writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
754    val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
755    new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize,
756      syncWrites, writeMetrics, blockId)
757  }
758
759  /**
760   * Put a new block of serialized bytes to the block manager.
761   *
762   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
763   * so may corrupt or change the data stored by the `BlockManager`.
764   *
765   * @param encrypt If true, asks the block manager to encrypt the data block before storing,
766   *                when I/O encryption is enabled. This is required for blocks that have been
767   *                read from unencrypted sources, since all the BlockManager read APIs
768   *                automatically do decryption.
769   * @return true if the block was stored or false if an error occurred.
770   */
771  def putBytes[T: ClassTag](
772      blockId: BlockId,
773      bytes: ChunkedByteBuffer,
774      level: StorageLevel,
775      tellMaster: Boolean = true,
776      encrypt: Boolean = false): Boolean = {
777    require(bytes != null, "Bytes is null")
778
779    val bytesToStore =
780      if (encrypt && securityManager.ioEncryptionKey.isDefined) {
781        try {
782          val data = bytes.toByteBuffer
783          val in = new ByteBufferInputStream(data)
784          val byteBufOut = new ByteBufferOutputStream(data.remaining())
785          val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
786            securityManager.ioEncryptionKey.get)
787          try {
788            ByteStreams.copy(in, out)
789          } finally {
790            in.close()
791            out.close()
792          }
793          new ChunkedByteBuffer(byteBufOut.toByteBuffer)
794        } finally {
795          bytes.dispose()
796        }
797      } else {
798        bytes
799      }
800
801    doPutBytes(blockId, bytesToStore, level, implicitly[ClassTag[T]], tellMaster)
802  }
803
804  /**
805   * Put the given bytes according to the given level in one of the block stores, replicating
806   * the values if necessary.
807   *
808   * If the block already exists, this method will not overwrite it.
809   *
810   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
811   * so may corrupt or change the data stored by the `BlockManager`.
812   *
813   * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
814   *                     block already exists). If false, this method will hold no locks when it
815   *                     returns.
816   * @return true if the block was already present or if the put succeeded, false otherwise.
817   */
818  private def doPutBytes[T](
819      blockId: BlockId,
820      bytes: ChunkedByteBuffer,
821      level: StorageLevel,
822      classTag: ClassTag[T],
823      tellMaster: Boolean = true,
824      keepReadLock: Boolean = false): Boolean = {
825    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
826      val startTimeMs = System.currentTimeMillis
827      // Since we're storing bytes, initiate the replication before storing them locally.
828      // This is faster as data is already serialized and ready to send.
829      val replicationFuture = if (level.replication > 1) {
830        Future {
831          // This is a blocking action and should run in futureExecutionContext which is a cached
832          // thread pool
833          replicate(blockId, bytes, level, classTag)
834        }(futureExecutionContext)
835      } else {
836        null
837      }
838
839      val size = bytes.size
840
841      if (level.useMemory) {
842        // Put it in memory first, even if it also has useDisk set to true;
843        // We will drop it to disk later if the memory store can't hold it.
844        val putSucceeded = if (level.deserialized) {
845          val values =
846            serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
847          memoryStore.putIteratorAsValues(blockId, values, classTag) match {
848            case Right(_) => true
849            case Left(iter) =>
850              // If putting deserialized values in memory failed, we will put the bytes directly to
851              // disk, so we don't need this iterator and can close it to free resources earlier.
852              iter.close()
853              false
854          }
855        } else {
856          val memoryMode = level.memoryMode
857          memoryStore.putBytes(blockId, size, memoryMode, () => {
858            if (memoryMode == MemoryMode.OFF_HEAP &&
859                bytes.chunks.exists(buffer => !buffer.isDirect)) {
860              bytes.copy(Platform.allocateDirectBuffer)
861            } else {
862              bytes
863            }
864          })
865        }
866        if (!putSucceeded && level.useDisk) {
867          logWarning(s"Persisting block $blockId to disk instead.")
868          diskStore.putBytes(blockId, bytes)
869        }
870      } else if (level.useDisk) {
871        diskStore.putBytes(blockId, bytes)
872      }
873
874      val putBlockStatus = getCurrentBlockStatus(blockId, info)
875      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
876      if (blockWasSuccessfullyStored) {
877        // Now that the block is in either the memory or disk store,
878        // tell the master about it.
879        info.size = size
880        if (tellMaster && info.tellMaster) {
881          reportBlockStatus(blockId, putBlockStatus)
882        }
883        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
884      }
885      logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
886      if (level.replication > 1) {
887        // Wait for asynchronous replication to finish
888        try {
889          Await.ready(replicationFuture, Duration.Inf)
890        } catch {
891          case NonFatal(t) =>
892            throw new Exception("Error occurred while waiting for replication to finish", t)
893        }
894      }
895      if (blockWasSuccessfullyStored) {
896        None
897      } else {
898        Some(bytes)
899      }
900    }.isEmpty
901  }
902
903  /**
904   * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
905   *
906   * @param putBody a function which attempts the actual put() and returns None on success
907   *                or Some on failure.
908   */
909  private def doPut[T](
910      blockId: BlockId,
911      level: StorageLevel,
912      classTag: ClassTag[_],
913      tellMaster: Boolean,
914      keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
915
916    require(blockId != null, "BlockId is null")
917    require(level != null && level.isValid, "StorageLevel is null or invalid")
918
919    val putBlockInfo = {
920      val newInfo = new BlockInfo(level, classTag, tellMaster)
921      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
922        newInfo
923      } else {
924        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
925        if (!keepReadLock) {
926          // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
927          releaseLock(blockId)
928        }
929        return None
930      }
931    }
932
933    val startTimeMs = System.currentTimeMillis
934    var exceptionWasThrown: Boolean = true
935    val result: Option[T] = try {
936      val res = putBody(putBlockInfo)
937      exceptionWasThrown = false
938      if (res.isEmpty) {
939        // the block was successfully stored
940        if (keepReadLock) {
941          blockInfoManager.downgradeLock(blockId)
942        } else {
943          blockInfoManager.unlock(blockId)
944        }
945      } else {
946        removeBlockInternal(blockId, tellMaster = false)
947        logWarning(s"Putting block $blockId failed")
948      }
949      res
950    } finally {
951      // This cleanup is performed in a finally block rather than a `catch` to avoid having to
952      // catch and properly re-throw InterruptedException.
953      if (exceptionWasThrown) {
954        logWarning(s"Putting block $blockId failed due to an exception")
955        // If an exception was thrown then it's possible that the code in `putBody` has already
956        // notified the master about the availability of this block, so we need to send an update
957        // to remove this block location.
958        removeBlockInternal(blockId, tellMaster = tellMaster)
959        // The `putBody` code may have also added a new block status to TaskMetrics, so we need
960        // to cancel that out by overwriting it with an empty block status. We only do this if
961        // the finally block was entered via an exception because doing this unconditionally would
962        // cause us to send empty block statuses for every block that failed to be cached due to
963        // a memory shortage (which is an expected failure, unlike an uncaught exception).
964        addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
965      }
966    }
967    if (level.replication > 1) {
968      logDebug("Putting block %s with replication took %s"
969        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
970    } else {
971      logDebug("Putting block %s without replication took %s"
972        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
973    }
974    result
975  }
976
977  /**
978   * Put the given block according to the given level in one of the block stores, replicating
979   * the values if necessary.
980   *
981   * If the block already exists, this method will not overwrite it.
982   *
983   * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
984   *                     block already exists). If false, this method will hold no locks when it
985   *                     returns.
986   * @return None if the block was already present or if the put succeeded, or Some(iterator)
987   *         if the put failed.
988   */
989  private def doPutIterator[T](
990      blockId: BlockId,
991      iterator: () => Iterator[T],
992      level: StorageLevel,
993      classTag: ClassTag[T],
994      tellMaster: Boolean = true,
995      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
996    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
997      val startTimeMs = System.currentTimeMillis
998      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
999      // Size of the block in bytes
1000      var size = 0L
1001      if (level.useMemory) {
1002        // Put it in memory first, even if it also has useDisk set to true;
1003        // We will drop it to disk later if the memory store can't hold it.
1004        if (level.deserialized) {
1005          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
1006            case Right(s) =>
1007              size = s
1008            case Left(iter) =>
1009              // Not enough space to unroll this block; drop to disk if applicable
1010              if (level.useDisk) {
1011                logWarning(s"Persisting block $blockId to disk instead.")
1012                diskStore.put(blockId) { fileOutputStream =>
1013                  serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
1014                }
1015                size = diskStore.getSize(blockId)
1016              } else {
1017                iteratorFromFailedMemoryStorePut = Some(iter)
1018              }
1019          }
1020        } else { // !level.deserialized
1021          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
1022            case Right(s) =>
1023              size = s
1024            case Left(partiallySerializedValues) =>
1025              // Not enough space to unroll this block; drop to disk if applicable
1026              if (level.useDisk) {
1027                logWarning(s"Persisting block $blockId to disk instead.")
1028                diskStore.put(blockId) { fileOutputStream =>
1029                  partiallySerializedValues.finishWritingToStream(fileOutputStream)
1030                }
1031                size = diskStore.getSize(blockId)
1032              } else {
1033                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
1034              }
1035          }
1036        }
1037
1038      } else if (level.useDisk) {
1039        diskStore.put(blockId) { fileOutputStream =>
1040          serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
1041        }
1042        size = diskStore.getSize(blockId)
1043      }
1044
1045      val putBlockStatus = getCurrentBlockStatus(blockId, info)
1046      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
1047      if (blockWasSuccessfullyStored) {
1048        // Now that the block is in either the memory or disk store, tell the master about it.
1049        info.size = size
1050        if (tellMaster && info.tellMaster) {
1051          reportBlockStatus(blockId, putBlockStatus)
1052        }
1053        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
1054        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
1055        if (level.replication > 1) {
1056          val remoteStartTime = System.currentTimeMillis
1057          val bytesToReplicate = doGetLocalBytes(blockId, info)
1058          // [SPARK-16550] Erase the typed classTag when using default serialization, since
1059          // NettyBlockRpcServer crashes when deserializing repl-defined classes.
1060          // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
1061          val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
1062            scala.reflect.classTag[Any]
1063          } else {
1064            classTag
1065          }
1066          try {
1067            replicate(blockId, bytesToReplicate, level, remoteClassTag)
1068          } finally {
1069            bytesToReplicate.unmap()
1070          }
1071          logDebug("Put block %s remotely took %s"
1072            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
1073        }
1074      }
1075      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
1076      iteratorFromFailedMemoryStorePut
1077    }
1078  }
1079
1080  /**
1081   * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
1082   * subsequent reads. This method requires the caller to hold a read lock on the block.
1083   *
1084   * @return a copy of the bytes from the memory store if the put succeeded, otherwise None.
1085   *         If this returns bytes from the memory store then the original disk store bytes will
1086   *         automatically be disposed and the caller should not continue to use them. Otherwise,
1087   *         if this returns None then the original disk store bytes will be unaffected.
1088   */
1089  private def maybeCacheDiskBytesInMemory(
1090      blockInfo: BlockInfo,
1091      blockId: BlockId,
1092      level: StorageLevel,
1093      diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = {
1094    require(!level.deserialized)
1095    if (level.useMemory) {
1096      // Synchronize on blockInfo to guard against a race condition where two readers both try to
1097      // put values read from disk into the MemoryStore.
1098      blockInfo.synchronized {
1099        if (memoryStore.contains(blockId)) {
1100          diskBytes.dispose()
1101          Some(memoryStore.getBytes(blockId).get)
1102        } else {
1103          val allocator = level.memoryMode match {
1104            case MemoryMode.ON_HEAP => ByteBuffer.allocate _
1105            case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
1106          }
1107          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => {
1108            // https://issues.apache.org/jira/browse/SPARK-6076
1109            // If the file size is bigger than the free memory, OOM will happen. So if we
1110            // cannot put it into MemoryStore, copyForMemory should not be created. That's why
1111            // this action is put into a `() => ChunkedByteBuffer` and created lazily.
1112            diskBytes.copy(allocator)
1113          })
1114          if (putSucceeded) {
1115            diskBytes.dispose()
1116            Some(memoryStore.getBytes(blockId).get)
1117          } else {
1118            None
1119          }
1120        }
1121      }
1122    } else {
1123      None
1124    }
1125  }
1126
1127  /**
1128   * Attempts to cache spilled values read from disk into the MemoryStore in order to speed up
1129   * subsequent reads. This method requires the caller to hold a read lock on the block.
1130   *
1131   * @return a copy of the iterator. The original iterator passed this method should no longer
1132   *         be used after this method returns.
1133   */
1134  private def maybeCacheDiskValuesInMemory[T](
1135      blockInfo: BlockInfo,
1136      blockId: BlockId,
1137      level: StorageLevel,
1138      diskIterator: Iterator[T]): Iterator[T] = {
1139    require(level.deserialized)
1140    val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]]
1141    if (level.useMemory) {
1142      // Synchronize on blockInfo to guard against a race condition where two readers both try to
1143      // put values read from disk into the MemoryStore.
1144      blockInfo.synchronized {
1145        if (memoryStore.contains(blockId)) {
1146          // Note: if we had a means to discard the disk iterator, we would do that here.
1147          memoryStore.getValues(blockId).get
1148        } else {
1149          memoryStore.putIteratorAsValues(blockId, diskIterator, classTag) match {
1150            case Left(iter) =>
1151              // The memory store put() failed, so it returned the iterator back to us:
1152              iter
1153            case Right(_) =>
1154              // The put() succeeded, so we can read the values back:
1155              memoryStore.getValues(blockId).get
1156          }
1157        }
1158      }.asInstanceOf[Iterator[T]]
1159    } else {
1160      diskIterator
1161    }
1162  }
1163
1164  /**
1165   * Get peer block managers in the system.
1166   */
1167  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
1168    peerFetchLock.synchronized {
1169      val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
1170      val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
1171      if (cachedPeers == null || forceFetch || timeout) {
1172        cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
1173        lastPeerFetchTime = System.currentTimeMillis
1174        logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
1175      }
1176      cachedPeers
1177    }
1178  }
1179
1180  /**
1181   * Replicate block to another node. Note that this is a blocking call that returns after
1182   * the block has been replicated.
1183   */
1184  private def replicate(
1185      blockId: BlockId,
1186      data: ChunkedByteBuffer,
1187      level: StorageLevel,
1188      classTag: ClassTag[_]): Unit = {
1189
1190    val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
1191    val tLevel = StorageLevel(
1192      useDisk = level.useDisk,
1193      useMemory = level.useMemory,
1194      useOffHeap = level.useOffHeap,
1195      deserialized = level.deserialized,
1196      replication = 1)
1197
1198    val numPeersToReplicateTo = level.replication - 1
1199
1200    val startTime = System.nanoTime
1201
1202    var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
1203    var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
1204    var numFailures = 0
1205
1206    var peersForReplication = blockReplicationPolicy.prioritize(
1207      blockManagerId,
1208      getPeers(false),
1209      mutable.HashSet.empty,
1210      blockId,
1211      numPeersToReplicateTo)
1212
1213    while(numFailures <= maxReplicationFailures &&
1214        !peersForReplication.isEmpty &&
1215        peersReplicatedTo.size != numPeersToReplicateTo) {
1216      val peer = peersForReplication.head
1217      try {
1218        val onePeerStartTime = System.nanoTime
1219        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
1220        blockTransferService.uploadBlockSync(
1221          peer.host,
1222          peer.port,
1223          peer.executorId,
1224          blockId,
1225          new NettyManagedBuffer(data.toNetty),
1226          tLevel,
1227          classTag)
1228        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
1229          s" in ${(System.nanoTime - onePeerStartTime).toDouble / 1e6} ms")
1230        peersForReplication = peersForReplication.tail
1231        peersReplicatedTo += peer
1232      } catch {
1233        case NonFatal(e) =>
1234          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
1235          peersFailedToReplicateTo += peer
1236          // we have a failed replication, so we get the list of peers again
1237          // we don't want peers we have already replicated to and the ones that
1238          // have failed previously
1239          val filteredPeers = getPeers(true).filter { p =>
1240            !peersFailedToReplicateTo.contains(p) && !peersReplicatedTo.contains(p)
1241          }
1242
1243          numFailures += 1
1244          peersForReplication = blockReplicationPolicy.prioritize(
1245            blockManagerId,
1246            filteredPeers,
1247            peersReplicatedTo,
1248            blockId,
1249            numPeersToReplicateTo - peersReplicatedTo.size)
1250      }
1251    }
1252
1253    logDebug(s"Replicating $blockId of ${data.size} bytes to " +
1254      s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
1255    if (peersReplicatedTo.size < numPeersToReplicateTo) {
1256      logWarning(s"Block $blockId replicated to only " +
1257        s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
1258    }
1259
1260    logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
1261  }
1262
1263  /**
1264   * Read a block consisting of a single object.
1265   */
1266  def getSingle[T: ClassTag](blockId: BlockId): Option[T] = {
1267    get[T](blockId).map(_.data.next().asInstanceOf[T])
1268  }
1269
1270  /**
1271   * Write a block consisting of a single object.
1272   *
1273   * @return true if the block was stored or false if the block was already stored or an
1274   *         error occurred.
1275   */
1276  def putSingle[T: ClassTag](
1277      blockId: BlockId,
1278      value: T,
1279      level: StorageLevel,
1280      tellMaster: Boolean = true): Boolean = {
1281    putIterator(blockId, Iterator(value), level, tellMaster)
1282  }
1283
1284  /**
1285   * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
1286   * store reaches its limit and needs to free up space.
1287   *
1288   * If `data` is not put on disk, it won't be created.
1289   *
1290   * The caller of this method must hold a write lock on the block before calling this method.
1291   * This method does not release the write lock.
1292   *
1293   * @return the block's new effective StorageLevel.
1294   */
1295  private[storage] override def dropFromMemory[T: ClassTag](
1296      blockId: BlockId,
1297      data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
1298    logInfo(s"Dropping block $blockId from memory")
1299    val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
1300    var blockIsUpdated = false
1301    val level = info.level
1302
1303    // Drop to disk, if storage level requires
1304    if (level.useDisk && !diskStore.contains(blockId)) {
1305      logInfo(s"Writing block $blockId to disk")
1306      data() match {
1307        case Left(elements) =>
1308          diskStore.put(blockId) { fileOutputStream =>
1309            serializerManager.dataSerializeStream(
1310              blockId,
1311              fileOutputStream,
1312              elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
1313          }
1314        case Right(bytes) =>
1315          diskStore.putBytes(blockId, bytes)
1316      }
1317      blockIsUpdated = true
1318    }
1319
1320    // Actually drop from memory store
1321    val droppedMemorySize =
1322      if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
1323    val blockIsRemoved = memoryStore.remove(blockId)
1324    if (blockIsRemoved) {
1325      blockIsUpdated = true
1326    } else {
1327      logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
1328    }
1329
1330    val status = getCurrentBlockStatus(blockId, info)
1331    if (info.tellMaster) {
1332      reportBlockStatus(blockId, status, droppedMemorySize)
1333    }
1334    if (blockIsUpdated) {
1335      addUpdatedBlockStatusToTaskMetrics(blockId, status)
1336    }
1337    status.storageLevel
1338  }
1339
1340  /**
1341   * Remove all blocks belonging to the given RDD.
1342   *
1343   * @return The number of blocks removed.
1344   */
1345  def removeRdd(rddId: Int): Int = {
1346    // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
1347    logInfo(s"Removing RDD $rddId")
1348    val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
1349    blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
1350    blocksToRemove.size
1351  }
1352
1353  /**
1354   * Remove all blocks belonging to the given broadcast.
1355   */
1356  def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
1357    logDebug(s"Removing broadcast $broadcastId")
1358    val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
1359      case bid @ BroadcastBlockId(`broadcastId`, _) => bid
1360    }
1361    blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
1362    blocksToRemove.size
1363  }
1364
1365  /**
1366   * Remove a block from both memory and disk.
1367   */
1368  def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
1369    logDebug(s"Removing block $blockId")
1370    blockInfoManager.lockForWriting(blockId) match {
1371      case None =>
1372        // The block has already been removed; do nothing.
1373        logWarning(s"Asked to remove block $blockId, which does not exist")
1374      case Some(info) =>
1375        removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
1376        addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
1377    }
1378  }
1379
1380  /**
1381   * Internal version of [[removeBlock()]] which assumes that the caller already holds a write
1382   * lock on the block.
1383   */
1384  private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
1385    // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1386    val removedFromMemory = memoryStore.remove(blockId)
1387    val removedFromDisk = diskStore.remove(blockId)
1388    if (!removedFromMemory && !removedFromDisk) {
1389      logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
1390    }
1391    blockInfoManager.removeBlock(blockId)
1392    if (tellMaster) {
1393      reportBlockStatus(blockId, BlockStatus.empty)
1394    }
1395  }
1396
1397  private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
1398    Option(TaskContext.get()).foreach { c =>
1399      c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
1400    }
1401  }
1402
1403  def stop(): Unit = {
1404    blockTransferService.close()
1405    if (shuffleClient ne blockTransferService) {
1406      // Closing should be idempotent, but maybe not for the NioBlockTransferService.
1407      shuffleClient.close()
1408    }
1409    diskBlockManager.stop()
1410    rpcEnv.stop(slaveEndpoint)
1411    blockInfoManager.clear()
1412    memoryStore.clear()
1413    futureExecutionContext.shutdownNow()
1414    logInfo("BlockManager stopped")
1415  }
1416}
1417
1418
1419private[spark] object BlockManager {
1420  private val ID_GENERATOR = new IdGenerator
1421
1422  def blockIdsToHosts(
1423      blockIds: Array[BlockId],
1424      env: SparkEnv,
1425      blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
1426
1427    // blockManagerMaster != null is used in tests
1428    assert(env != null || blockManagerMaster != null)
1429    val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
1430      env.blockManager.getLocationBlockIds(blockIds)
1431    } else {
1432      blockManagerMaster.getLocations(blockIds)
1433    }
1434
1435    val blockManagers = new HashMap[BlockId, Seq[String]]
1436    for (i <- 0 until blockIds.length) {
1437      blockManagers(blockIds(i)) = blockLocations(i).map(_.host)
1438    }
1439    blockManagers.toMap
1440  }
1441}
1442