1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import scala.reflect.ClassTag 21 22import org.apache.spark.annotation.DeveloperApi 23import org.apache.spark.rdd.RDD 24import org.apache.spark.serializer.Serializer 25import org.apache.spark.shuffle.ShuffleHandle 26 27/** 28 * :: DeveloperApi :: 29 * Base class for dependencies. 30 */ 31@DeveloperApi 32abstract class Dependency[T] extends Serializable { 33 def rdd: RDD[T] 34} 35 36 37/** 38 * :: DeveloperApi :: 39 * Base class for dependencies where each partition of the child RDD depends on a small number 40 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. 41 */ 42@DeveloperApi 43abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { 44 /** 45 * Get the parent partitions for a child partition. 46 * @param partitionId a partition of the child RDD 47 * @return the partitions of the parent RDD that the child partition depends upon 48 */ 49 def getParents(partitionId: Int): Seq[Int] 50 51 override def rdd: RDD[T] = _rdd 52} 53 54 55/** 56 * :: DeveloperApi :: 57 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, 58 * the RDD is transient since we don't need it on the executor side. 59 * 60 * @param _rdd the parent RDD 61 * @param partitioner partitioner used to partition the shuffle output 62 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set 63 * explicitly then the default serializer, as specified by `spark.serializer` 64 * config option, will be used. 65 * @param keyOrdering key ordering for RDD's shuffles 66 * @param aggregator map/reduce-side aggregator for RDD's shuffle 67 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) 68 */ 69@DeveloperApi 70class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( 71 @transient private val _rdd: RDD[_ <: Product2[K, V]], 72 val partitioner: Partitioner, 73 val serializer: Serializer = SparkEnv.get.serializer, 74 val keyOrdering: Option[Ordering[K]] = None, 75 val aggregator: Option[Aggregator[K, V, C]] = None, 76 val mapSideCombine: Boolean = false) 77 extends Dependency[Product2[K, V]] { 78 79 override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] 80 81 private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName 82 private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName 83 // Note: It's possible that the combiner class tag is null, if the combineByKey 84 // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. 85 private[spark] val combinerClassName: Option[String] = 86 Option(reflect.classTag[C]).map(_.runtimeClass.getName) 87 88 val shuffleId: Int = _rdd.context.newShuffleId() 89 90 val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( 91 shuffleId, _rdd.partitions.length, this) 92 93 _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) 94} 95 96 97/** 98 * :: DeveloperApi :: 99 * Represents a one-to-one dependency between partitions of the parent and child RDDs. 100 */ 101@DeveloperApi 102class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { 103 override def getParents(partitionId: Int): List[Int] = List(partitionId) 104} 105 106 107/** 108 * :: DeveloperApi :: 109 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. 110 * @param rdd the parent RDD 111 * @param inStart the start of the range in the parent RDD 112 * @param outStart the start of the range in the child RDD 113 * @param length the length of the range 114 */ 115@DeveloperApi 116class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) 117 extends NarrowDependency[T](rdd) { 118 119 override def getParents(partitionId: Int): List[Int] = { 120 if (partitionId >= outStart && partitionId < outStart + length) { 121 List(partitionId - outStart + inStart) 122 } else { 123 Nil 124 } 125 } 126} 127