1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import scala.reflect.ClassTag
21
22import org.apache.spark.annotation.DeveloperApi
23import org.apache.spark.rdd.RDD
24import org.apache.spark.serializer.Serializer
25import org.apache.spark.shuffle.ShuffleHandle
26
27/**
28 * :: DeveloperApi ::
29 * Base class for dependencies.
30 */
31@DeveloperApi
32abstract class Dependency[T] extends Serializable {
33  def rdd: RDD[T]
34}
35
36
37/**
38 * :: DeveloperApi ::
39 * Base class for dependencies where each partition of the child RDD depends on a small number
40 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
41 */
42@DeveloperApi
43abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
44  /**
45   * Get the parent partitions for a child partition.
46   * @param partitionId a partition of the child RDD
47   * @return the partitions of the parent RDD that the child partition depends upon
48   */
49  def getParents(partitionId: Int): Seq[Int]
50
51  override def rdd: RDD[T] = _rdd
52}
53
54
55/**
56 * :: DeveloperApi ::
57 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
58 * the RDD is transient since we don't need it on the executor side.
59 *
60 * @param _rdd the parent RDD
61 * @param partitioner partitioner used to partition the shuffle output
62 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
63 *                   explicitly then the default serializer, as specified by `spark.serializer`
64 *                   config option, will be used.
65 * @param keyOrdering key ordering for RDD's shuffles
66 * @param aggregator map/reduce-side aggregator for RDD's shuffle
67 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
68 */
69@DeveloperApi
70class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
71    @transient private val _rdd: RDD[_ <: Product2[K, V]],
72    val partitioner: Partitioner,
73    val serializer: Serializer = SparkEnv.get.serializer,
74    val keyOrdering: Option[Ordering[K]] = None,
75    val aggregator: Option[Aggregator[K, V, C]] = None,
76    val mapSideCombine: Boolean = false)
77  extends Dependency[Product2[K, V]] {
78
79  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
80
81  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
82  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
83  // Note: It's possible that the combiner class tag is null, if the combineByKey
84  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
85  private[spark] val combinerClassName: Option[String] =
86    Option(reflect.classTag[C]).map(_.runtimeClass.getName)
87
88  val shuffleId: Int = _rdd.context.newShuffleId()
89
90  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
91    shuffleId, _rdd.partitions.length, this)
92
93  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
94}
95
96
97/**
98 * :: DeveloperApi ::
99 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
100 */
101@DeveloperApi
102class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
103  override def getParents(partitionId: Int): List[Int] = List(partitionId)
104}
105
106
107/**
108 * :: DeveloperApi ::
109 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
110 * @param rdd the parent RDD
111 * @param inStart the start of the range in the parent RDD
112 * @param outStart the start of the range in the child RDD
113 * @param length the length of the range
114 */
115@DeveloperApi
116class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
117  extends NarrowDependency[T](rdd) {
118
119  override def getParents(partitionId: Int): List[Int] = {
120    if (partitionId >= outStart && partitionId < outStart + length) {
121      List(partitionId - outStart + inStart)
122    } else {
123      Nil
124    }
125  }
126}
127