1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.storage 19 20import java.util.UUID 21 22import org.apache.spark.annotation.DeveloperApi 23 24/** 25 * :: DeveloperApi :: 26 * Identifies a particular Block of data, usually associated with a single file. 27 * A Block can be uniquely identified by its filename, but each type of Block has a different 28 * set of keys which produce its unique name. 29 * 30 * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method. 31 */ 32@DeveloperApi 33sealed abstract class BlockId { 34 /** A globally unique identifier for this Block. Can be used for ser/de. */ 35 def name: String 36 37 // convenience methods 38 def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None 39 def isRDD: Boolean = isInstanceOf[RDDBlockId] 40 def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] 41 def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] 42 43 override def toString: String = name 44 override def hashCode: Int = name.hashCode 45 override def equals(other: Any): Boolean = other match { 46 case o: BlockId => getClass == o.getClass && name.equals(o.name) 47 case _ => false 48 } 49} 50 51@DeveloperApi 52case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { 53 override def name: String = "rdd_" + rddId + "_" + splitIndex 54} 55 56// Format of the shuffle block ids (including data and index) should be kept in sync with 57// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). 58@DeveloperApi 59case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { 60 override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId 61} 62 63@DeveloperApi 64case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { 65 override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" 66} 67 68@DeveloperApi 69case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { 70 override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" 71} 72 73@DeveloperApi 74case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId { 75 override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field) 76} 77 78@DeveloperApi 79case class TaskResultBlockId(taskId: Long) extends BlockId { 80 override def name: String = "taskresult_" + taskId 81} 82 83@DeveloperApi 84case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { 85 override def name: String = "input-" + streamId + "-" + uniqueId 86} 87 88/** Id associated with temporary local data managed as blocks. Not serializable. */ 89private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { 90 override def name: String = "temp_local_" + id 91} 92 93/** Id associated with temporary shuffle data managed as blocks. Not serializable. */ 94private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId { 95 override def name: String = "temp_shuffle_" + id 96} 97 98// Intended only for testing purposes 99private[spark] case class TestBlockId(id: String) extends BlockId { 100 override def name: String = "test_" + id 101} 102 103@DeveloperApi 104object BlockId { 105 val RDD = "rdd_([0-9]+)_([0-9]+)".r 106 val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r 107 val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r 108 val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r 109 val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r 110 val TASKRESULT = "taskresult_([0-9]+)".r 111 val STREAM = "input-([0-9]+)-([0-9]+)".r 112 val TEST = "test_(.*)".r 113 114 /** Converts a BlockId "name" String back into a BlockId. */ 115 def apply(id: String): BlockId = id match { 116 case RDD(rddId, splitIndex) => 117 RDDBlockId(rddId.toInt, splitIndex.toInt) 118 case SHUFFLE(shuffleId, mapId, reduceId) => 119 ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) 120 case SHUFFLE_DATA(shuffleId, mapId, reduceId) => 121 ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) 122 case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => 123 ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) 124 case BROADCAST(broadcastId, field) => 125 BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) 126 case TASKRESULT(taskId) => 127 TaskResultBlockId(taskId.toLong) 128 case STREAM(streamId, uniqueId) => 129 StreamBlockId(streamId.toInt, uniqueId.toLong) 130 case TEST(value) => 131 TestBlockId(value) 132 case _ => 133 throw new IllegalStateException("Unrecognized BlockId: " + id) 134 } 135} 136