1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.storage
19
20import java.io.{Externalizable, ObjectInput, ObjectOutput}
21
22import org.apache.spark.rpc.RpcEndpointRef
23import org.apache.spark.util.Utils
24
25private[spark] object BlockManagerMessages {
26  //////////////////////////////////////////////////////////////////////////////////
27  // Messages from the master to slaves.
28  //////////////////////////////////////////////////////////////////////////////////
29  sealed trait ToBlockManagerSlave
30
31  // Remove a block from the slaves that have it. This can only be used to remove
32  // blocks that the master knows about.
33  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
34
35  // Remove all blocks belonging to a specific RDD.
36  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
37
38  // Remove all blocks belonging to a specific shuffle.
39  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave
40
41  // Remove all blocks belonging to a specific broadcast.
42  case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
43    extends ToBlockManagerSlave
44
45  /**
46   * Driver to Executor message to trigger a thread dump.
47   */
48  case object TriggerThreadDump extends ToBlockManagerSlave
49
50  //////////////////////////////////////////////////////////////////////////////////
51  // Messages from slaves to the master.
52  //////////////////////////////////////////////////////////////////////////////////
53  sealed trait ToBlockManagerMaster
54
55  case class RegisterBlockManager(
56      blockManagerId: BlockManagerId,
57      maxMemSize: Long,
58      sender: RpcEndpointRef)
59    extends ToBlockManagerMaster
60
61  case class UpdateBlockInfo(
62      var blockManagerId: BlockManagerId,
63      var blockId: BlockId,
64      var storageLevel: StorageLevel,
65      var memSize: Long,
66      var diskSize: Long)
67    extends ToBlockManagerMaster
68    with Externalizable {
69
70    def this() = this(null, null, null, 0, 0)  // For deserialization only
71
72    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
73      blockManagerId.writeExternal(out)
74      out.writeUTF(blockId.name)
75      storageLevel.writeExternal(out)
76      out.writeLong(memSize)
77      out.writeLong(diskSize)
78    }
79
80    override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
81      blockManagerId = BlockManagerId(in)
82      blockId = BlockId(in.readUTF())
83      storageLevel = StorageLevel(in)
84      memSize = in.readLong()
85      diskSize = in.readLong()
86    }
87  }
88
89  case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster
90
91  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
92
93  case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
94
95  case class GetExecutorEndpointRef(executorId: String) extends ToBlockManagerMaster
96
97  case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
98
99  case object StopBlockManagerMaster extends ToBlockManagerMaster
100
101  case object GetMemoryStatus extends ToBlockManagerMaster
102
103  case object GetStorageStatus extends ToBlockManagerMaster
104
105  case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
106    extends ToBlockManagerMaster
107
108  case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
109    extends ToBlockManagerMaster
110
111  case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
112
113  case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster
114}
115