1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.storage
19
20import java.util.UUID
21
22import org.apache.spark.annotation.DeveloperApi
23
24/**
25 * :: DeveloperApi ::
26 * Identifies a particular Block of data, usually associated with a single file.
27 * A Block can be uniquely identified by its filename, but each type of Block has a different
28 * set of keys which produce its unique name.
29 *
30 * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
31 */
32@DeveloperApi
33sealed abstract class BlockId {
34  /** A globally unique identifier for this Block. Can be used for ser/de. */
35  def name: String
36
37  // convenience methods
38  def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
39  def isRDD: Boolean = isInstanceOf[RDDBlockId]
40  def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
41  def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
42
43  override def toString: String = name
44  override def hashCode: Int = name.hashCode
45  override def equals(other: Any): Boolean = other match {
46    case o: BlockId => getClass == o.getClass && name.equals(o.name)
47    case _ => false
48  }
49}
50
51@DeveloperApi
52case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
53  override def name: String = "rdd_" + rddId + "_" + splitIndex
54}
55
56// Format of the shuffle block ids (including data and index) should be kept in sync with
57// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
58@DeveloperApi
59case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
60  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
61}
62
63@DeveloperApi
64case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
65  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
66}
67
68@DeveloperApi
69case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
70  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
71}
72
73@DeveloperApi
74case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
75  override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
76}
77
78@DeveloperApi
79case class TaskResultBlockId(taskId: Long) extends BlockId {
80  override def name: String = "taskresult_" + taskId
81}
82
83@DeveloperApi
84case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
85  override def name: String = "input-" + streamId + "-" + uniqueId
86}
87
88/** Id associated with temporary local data managed as blocks. Not serializable. */
89private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
90  override def name: String = "temp_local_" + id
91}
92
93/** Id associated with temporary shuffle data managed as blocks. Not serializable. */
94private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
95  override def name: String = "temp_shuffle_" + id
96}
97
98// Intended only for testing purposes
99private[spark] case class TestBlockId(id: String) extends BlockId {
100  override def name: String = "test_" + id
101}
102
103@DeveloperApi
104object BlockId {
105  val RDD = "rdd_([0-9]+)_([0-9]+)".r
106  val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
107  val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
108  val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
109  val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
110  val TASKRESULT = "taskresult_([0-9]+)".r
111  val STREAM = "input-([0-9]+)-([0-9]+)".r
112  val TEST = "test_(.*)".r
113
114  /** Converts a BlockId "name" String back into a BlockId. */
115  def apply(id: String): BlockId = id match {
116    case RDD(rddId, splitIndex) =>
117      RDDBlockId(rddId.toInt, splitIndex.toInt)
118    case SHUFFLE(shuffleId, mapId, reduceId) =>
119      ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
120    case SHUFFLE_DATA(shuffleId, mapId, reduceId) =>
121      ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
122    case SHUFFLE_INDEX(shuffleId, mapId, reduceId) =>
123      ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
124    case BROADCAST(broadcastId, field) =>
125      BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_"))
126    case TASKRESULT(taskId) =>
127      TaskResultBlockId(taskId.toLong)
128    case STREAM(streamId, uniqueId) =>
129      StreamBlockId(streamId.toInt, uniqueId.toLong)
130    case TEST(value) =>
131      TestBlockId(value)
132    case _ =>
133      throw new IllegalStateException("Unrecognized BlockId: " + id)
134  }
135}
136