1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.rdd
19
20import java.util.Random
21
22import scala.collection.{mutable, Map}
23import scala.collection.mutable.ArrayBuffer
24import scala.io.Codec
25import scala.language.implicitConversions
26import scala.reflect.{classTag, ClassTag}
27
28import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
29import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
30import org.apache.hadoop.io.compress.CompressionCodec
31import org.apache.hadoop.mapred.TextOutputFormat
32
33import org.apache.spark._
34import org.apache.spark.Partitioner._
35import org.apache.spark.annotation.{DeveloperApi, Since}
36import org.apache.spark.api.java.JavaRDD
37import org.apache.spark.internal.Logging
38import org.apache.spark.partial.BoundedDouble
39import org.apache.spark.partial.CountEvaluator
40import org.apache.spark.partial.GroupedCountEvaluator
41import org.apache.spark.partial.PartialResult
42import org.apache.spark.storage.{RDDBlockId, StorageLevel}
43import org.apache.spark.util.{BoundedPriorityQueue, Utils}
44import org.apache.spark.util.collection.OpenHashMap
45import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
46  SamplingUtils}
47
48/**
49 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
50 * partitioned collection of elements that can be operated on in parallel. This class contains the
51 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
52 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
53 * pairs, such as `groupByKey` and `join`;
54 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
55 * Doubles; and
56 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
57 * can be saved as SequenceFiles.
58 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
59 * through implicit.
60 *
61 * Internally, each RDD is characterized by five main properties:
62 *
63 *  - A list of partitions
64 *  - A function for computing each split
65 *  - A list of dependencies on other RDDs
66 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
67 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
68 *    an HDFS file)
69 *
70 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
71 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
72 * reading data from a new storage system) by overriding these functions. Please refer to the
73 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
74 * for more details on RDD internals.
75 */
76abstract class RDD[T: ClassTag](
77    @transient private var _sc: SparkContext,
78    @transient private var deps: Seq[Dependency[_]]
79  ) extends Serializable with Logging {
80
81  if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
82    // This is a warning instead of an exception in order to avoid breaking user programs that
83    // might have defined nested RDDs without running jobs with them.
84    logWarning("Spark does not support nested RDDs (see SPARK-5063)")
85  }
86
87  private def sc: SparkContext = {
88    if (_sc == null) {
89      throw new SparkException(
90        "This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " +
91        "transformations and actions are NOT invoked by the driver, but inside of other " +
92        "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
93        "because the values transformation and count action cannot be performed inside of the " +
94        "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
95        "Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
96        "an RDD not defined by the streaming job is used in DStream operations. For more " +
97        "information, See SPARK-13758.")
98    }
99    _sc
100  }
101
102  /** Construct an RDD with just a one-to-one dependency on one parent */
103  def this(@transient oneParent: RDD[_]) =
104    this(oneParent.context, List(new OneToOneDependency(oneParent)))
105
106  private[spark] def conf = sc.conf
107  // =======================================================================
108  // Methods that should be implemented by subclasses of RDD
109  // =======================================================================
110
111  /**
112   * :: DeveloperApi ::
113   * Implemented by subclasses to compute a given partition.
114   */
115  @DeveloperApi
116  def compute(split: Partition, context: TaskContext): Iterator[T]
117
118  /**
119   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
120   * be called once, so it is safe to implement a time-consuming computation in it.
121   *
122   * The partitions in this array must satisfy the following property:
123   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
124   */
125  protected def getPartitions: Array[Partition]
126
127  /**
128   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
129   * be called once, so it is safe to implement a time-consuming computation in it.
130   */
131  protected def getDependencies: Seq[Dependency[_]] = deps
132
133  /**
134   * Optionally overridden by subclasses to specify placement preferences.
135   */
136  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
137
138  /** Optionally overridden by subclasses to specify how they are partitioned. */
139  @transient val partitioner: Option[Partitioner] = None
140
141  // =======================================================================
142  // Methods and fields available on all RDDs
143  // =======================================================================
144
145  /** The SparkContext that created this RDD. */
146  def sparkContext: SparkContext = sc
147
148  /** A unique ID for this RDD (within its SparkContext). */
149  val id: Int = sc.newRddId()
150
151  /** A friendly name for this RDD */
152  @transient var name: String = null
153
154  /** Assign a name to this RDD */
155  def setName(_name: String): this.type = {
156    name = _name
157    this
158  }
159
160  /**
161   * Mark this RDD for persisting using the specified level.
162   *
163   * @param newLevel the target storage level
164   * @param allowOverride whether to override any existing level with the new one
165   */
166  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
167    // TODO: Handle changes of StorageLevel
168    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
169      throw new UnsupportedOperationException(
170        "Cannot change storage level of an RDD after it was already assigned a level")
171    }
172    // If this is the first time this RDD is marked for persisting, register it
173    // with the SparkContext for cleanups and accounting. Do this only once.
174    if (storageLevel == StorageLevel.NONE) {
175      sc.cleaner.foreach(_.registerRDDForCleanup(this))
176      sc.persistRDD(this)
177    }
178    storageLevel = newLevel
179    this
180  }
181
182  /**
183   * Set this RDD's storage level to persist its values across operations after the first time
184   * it is computed. This can only be used to assign a new storage level if the RDD does not
185   * have a storage level set yet. Local checkpointing is an exception.
186   */
187  def persist(newLevel: StorageLevel): this.type = {
188    if (isLocallyCheckpointed) {
189      // This means the user previously called localCheckpoint(), which should have already
190      // marked this RDD for persisting. Here we should override the old storage level with
191      // one that is explicitly requested by the user (after adapting it to use disk).
192      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
193    } else {
194      persist(newLevel, allowOverride = false)
195    }
196  }
197
198  /**
199   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
200   */
201  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
202
203  /**
204   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
205   */
206  def cache(): this.type = persist()
207
208  /**
209   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
210   *
211   * @param blocking Whether to block until all blocks are deleted.
212   * @return This RDD.
213   */
214  def unpersist(blocking: Boolean = true): this.type = {
215    logInfo("Removing RDD " + id + " from persistence list")
216    sc.unpersistRDD(id, blocking)
217    storageLevel = StorageLevel.NONE
218    this
219  }
220
221  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
222  def getStorageLevel: StorageLevel = storageLevel
223
224  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
225  // be overwritten when we're checkpointed
226  private var dependencies_ : Seq[Dependency[_]] = null
227  @transient private var partitions_ : Array[Partition] = null
228
229  /** An Option holding our checkpoint RDD, if we are checkpointed */
230  private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD)
231
232  /**
233   * Get the list of dependencies of this RDD, taking into account whether the
234   * RDD is checkpointed or not.
235   */
236  final def dependencies: Seq[Dependency[_]] = {
237    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
238      if (dependencies_ == null) {
239        dependencies_ = getDependencies
240      }
241      dependencies_
242    }
243  }
244
245  /**
246   * Get the array of partitions of this RDD, taking into account whether the
247   * RDD is checkpointed or not.
248   */
249  final def partitions: Array[Partition] = {
250    checkpointRDD.map(_.partitions).getOrElse {
251      if (partitions_ == null) {
252        partitions_ = getPartitions
253        partitions_.zipWithIndex.foreach { case (partition, index) =>
254          require(partition.index == index,
255            s"partitions($index).partition == ${partition.index}, but it should equal $index")
256        }
257      }
258      partitions_
259    }
260  }
261
262  /**
263   * Returns the number of partitions of this RDD.
264   */
265  @Since("1.6.0")
266  final def getNumPartitions: Int = partitions.length
267
268  /**
269   * Get the preferred locations of a partition, taking into account whether the
270   * RDD is checkpointed.
271   */
272  final def preferredLocations(split: Partition): Seq[String] = {
273    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
274      getPreferredLocations(split)
275    }
276  }
277
278  /**
279   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
280   * This should ''not'' be called by users directly, but is available for implementors of custom
281   * subclasses of RDD.
282   */
283  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
284    if (storageLevel != StorageLevel.NONE) {
285      getOrCompute(split, context)
286    } else {
287      computeOrReadCheckpoint(split, context)
288    }
289  }
290
291  /**
292   * Return the ancestors of the given RDD that are related to it only through a sequence of
293   * narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
294   * no ordering on the RDDs returned.
295   */
296  private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
297    val ancestors = new mutable.HashSet[RDD[_]]
298
299    def visit(rdd: RDD[_]) {
300      val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
301      val narrowParents = narrowDependencies.map(_.rdd)
302      val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
303      narrowParentsNotVisited.foreach { parent =>
304        ancestors.add(parent)
305        visit(parent)
306      }
307    }
308
309    visit(this)
310
311    // In case there is a cycle, do not include the root itself
312    ancestors.filterNot(_ == this).toSeq
313  }
314
315  /**
316   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
317   */
318  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
319  {
320    if (isCheckpointedAndMaterialized) {
321      firstParent[T].iterator(split, context)
322    } else {
323      compute(split, context)
324    }
325  }
326
327  /**
328   * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
329   */
330  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
331    val blockId = RDDBlockId(id, partition.index)
332    var readCachedBlock = true
333    // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
334    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
335      readCachedBlock = false
336      computeOrReadCheckpoint(partition, context)
337    }) match {
338      case Left(blockResult) =>
339        if (readCachedBlock) {
340          val existingMetrics = context.taskMetrics().inputMetrics
341          existingMetrics.incBytesRead(blockResult.bytes)
342          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
343            override def next(): T = {
344              existingMetrics.incRecordsRead(1)
345              delegate.next()
346            }
347          }
348        } else {
349          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
350        }
351      case Right(iter) =>
352        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
353    }
354  }
355
356  /**
357   * Execute a block of code in a scope such that all new RDDs created in this body will
358   * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
359   *
360   * Note: Return statements are NOT allowed in the given body.
361   */
362  private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
363
364  // Transformations (return a new RDD)
365
366  /**
367   * Return a new RDD by applying a function to all elements of this RDD.
368   */
369  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
370    val cleanF = sc.clean(f)
371    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
372  }
373
374  /**
375   *  Return a new RDD by first applying a function to all elements of this
376   *  RDD, and then flattening the results.
377   */
378  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
379    val cleanF = sc.clean(f)
380    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
381  }
382
383  /**
384   * Return a new RDD containing only the elements that satisfy a predicate.
385   */
386  def filter(f: T => Boolean): RDD[T] = withScope {
387    val cleanF = sc.clean(f)
388    new MapPartitionsRDD[T, T](
389      this,
390      (context, pid, iter) => iter.filter(cleanF),
391      preservesPartitioning = true)
392  }
393
394  /**
395   * Return a new RDD containing the distinct elements in this RDD.
396   */
397  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
398    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
399  }
400
401  /**
402   * Return a new RDD containing the distinct elements in this RDD.
403   */
404  def distinct(): RDD[T] = withScope {
405    distinct(partitions.length)
406  }
407
408  /**
409   * Return a new RDD that has exactly numPartitions partitions.
410   *
411   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
412   * a shuffle to redistribute data.
413   *
414   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
415   * which can avoid performing a shuffle.
416   */
417  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
418    coalesce(numPartitions, shuffle = true)
419  }
420
421  /**
422   * Return a new RDD that is reduced into `numPartitions` partitions.
423   *
424   * This results in a narrow dependency, e.g. if you go from 1000 partitions
425   * to 100 partitions, there will not be a shuffle, instead each of the 100
426   * new partitions will claim 10 of the current partitions. If a larger number
427   * of partitions is requested, it will stay at the current number of partitions.
428   *
429   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
430   * this may result in your computation taking place on fewer nodes than
431   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
432   * you can pass shuffle = true. This will add a shuffle step, but means the
433   * current upstream partitions will be executed in parallel (per whatever
434   * the current partitioning is).
435   *
436   * @note With shuffle = true, you can actually coalesce to a larger number
437   * of partitions. This is useful if you have a small number of partitions,
438   * say 100, potentially with a few partitions being abnormally large. Calling
439   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
440   * data distributed using a hash partitioner. The optional partition coalescer
441   * passed in must be serializable.
442   */
443  def coalesce(numPartitions: Int, shuffle: Boolean = false,
444               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
445              (implicit ord: Ordering[T] = null)
446      : RDD[T] = withScope {
447    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
448    if (shuffle) {
449      /** Distributes elements evenly across output partitions, starting from a random partition. */
450      val distributePartition = (index: Int, items: Iterator[T]) => {
451        var position = (new Random(index)).nextInt(numPartitions)
452        items.map { t =>
453          // Note that the hash code of the key will just be the key itself. The HashPartitioner
454          // will mod it with the number of total partitions.
455          position = position + 1
456          (position, t)
457        }
458      } : Iterator[(Int, T)]
459
460      // include a shuffle step so that our upstream tasks are still distributed
461      new CoalescedRDD(
462        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
463        new HashPartitioner(numPartitions)),
464        numPartitions,
465        partitionCoalescer).values
466    } else {
467      new CoalescedRDD(this, numPartitions, partitionCoalescer)
468    }
469  }
470
471  /**
472   * Return a sampled subset of this RDD.
473   *
474   * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
475   * @param fraction expected size of the sample as a fraction of this RDD's size
476   *  without replacement: probability that each element is chosen; fraction must be [0, 1]
477   *  with replacement: expected number of times each element is chosen; fraction must be greater
478   *  than or equal to 0
479   * @param seed seed for the random number generator
480   *
481   * @note This is NOT guaranteed to provide exactly the fraction of the count
482   * of the given [[RDD]].
483   */
484  def sample(
485      withReplacement: Boolean,
486      fraction: Double,
487      seed: Long = Utils.random.nextLong): RDD[T] = {
488    require(fraction >= 0,
489      s"Fraction must be nonnegative, but got ${fraction}")
490
491    withScope {
492      require(fraction >= 0.0, "Negative fraction value: " + fraction)
493      if (withReplacement) {
494        new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
495      } else {
496        new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
497      }
498    }
499  }
500
501  /**
502   * Randomly splits this RDD with the provided weights.
503   *
504   * @param weights weights for splits, will be normalized if they don't sum to 1
505   * @param seed random seed
506   *
507   * @return split RDDs in an array
508   */
509  def randomSplit(
510      weights: Array[Double],
511      seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
512    require(weights.forall(_ >= 0),
513      s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
514    require(weights.sum > 0,
515      s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")
516
517    withScope {
518      val sum = weights.sum
519      val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
520      normalizedCumWeights.sliding(2).map { x =>
521        randomSampleWithRange(x(0), x(1), seed)
522      }.toArray
523    }
524  }
525
526
527  /**
528   * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability
529   * range.
530   * @param lb lower bound to use for the Bernoulli sampler
531   * @param ub upper bound to use for the Bernoulli sampler
532   * @param seed the seed for the Random number generator
533   * @return A random sub-sample of the RDD without replacement.
534   */
535  private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {
536    this.mapPartitionsWithIndex( { (index, partition) =>
537      val sampler = new BernoulliCellSampler[T](lb, ub)
538      sampler.setSeed(seed + index)
539      sampler.sample(partition)
540    }, preservesPartitioning = true)
541  }
542
543  /**
544   * Return a fixed-size sampled subset of this RDD in an array
545   *
546   * @param withReplacement whether sampling is done with replacement
547   * @param num size of the returned sample
548   * @param seed seed for the random number generator
549   * @return sample of specified size in an array
550   *
551   * @note this method should only be used if the resulting array is expected to be small, as
552   * all the data is loaded into the driver's memory.
553   */
554  def takeSample(
555      withReplacement: Boolean,
556      num: Int,
557      seed: Long = Utils.random.nextLong): Array[T] = withScope {
558    val numStDev = 10.0
559
560    require(num >= 0, "Negative number of elements requested")
561    require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
562      "Cannot support a sample size > Int.MaxValue - " +
563      s"$numStDev * math.sqrt(Int.MaxValue)")
564
565    if (num == 0) {
566      new Array[T](0)
567    } else {
568      val initialCount = this.count()
569      if (initialCount == 0) {
570        new Array[T](0)
571      } else {
572        val rand = new Random(seed)
573        if (!withReplacement && num >= initialCount) {
574          Utils.randomizeInPlace(this.collect(), rand)
575        } else {
576          val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
577            withReplacement)
578          var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
579
580          // If the first sample didn't turn out large enough, keep trying to take samples;
581          // this shouldn't happen often because we use a big multiplier for the initial size
582          var numIters = 0
583          while (samples.length < num) {
584            logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
585            samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
586            numIters += 1
587          }
588          Utils.randomizeInPlace(samples, rand).take(num)
589        }
590      }
591    }
592  }
593
594  /**
595   * Return the union of this RDD and another one. Any identical elements will appear multiple
596   * times (use `.distinct()` to eliminate them).
597   */
598  def union(other: RDD[T]): RDD[T] = withScope {
599    sc.union(this, other)
600  }
601
602  /**
603   * Return the union of this RDD and another one. Any identical elements will appear multiple
604   * times (use `.distinct()` to eliminate them).
605   */
606  def ++(other: RDD[T]): RDD[T] = withScope {
607    this.union(other)
608  }
609
610  /**
611   * Return this RDD sorted by the given key function.
612   */
613  def sortBy[K](
614      f: (T) => K,
615      ascending: Boolean = true,
616      numPartitions: Int = this.partitions.length)
617      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
618    this.keyBy[K](f)
619        .sortByKey(ascending, numPartitions)
620        .values
621  }
622
623  /**
624   * Return the intersection of this RDD and another one. The output will not contain any duplicate
625   * elements, even if the input RDDs did.
626   *
627   * @note This method performs a shuffle internally.
628   */
629  def intersection(other: RDD[T]): RDD[T] = withScope {
630    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
631        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
632        .keys
633  }
634
635  /**
636   * Return the intersection of this RDD and another one. The output will not contain any duplicate
637   * elements, even if the input RDDs did.
638   *
639   * @note This method performs a shuffle internally.
640   *
641   * @param partitioner Partitioner to use for the resulting RDD
642   */
643  def intersection(
644      other: RDD[T],
645      partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
646    this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
647        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
648        .keys
649  }
650
651  /**
652   * Return the intersection of this RDD and another one. The output will not contain any duplicate
653   * elements, even if the input RDDs did.  Performs a hash partition across the cluster
654   *
655   * @note This method performs a shuffle internally.
656   *
657   * @param numPartitions How many partitions to use in the resulting RDD
658   */
659  def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
660    intersection(other, new HashPartitioner(numPartitions))
661  }
662
663  /**
664   * Return an RDD created by coalescing all elements within each partition into an array.
665   */
666  def glom(): RDD[Array[T]] = withScope {
667    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
668  }
669
670  /**
671   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
672   * elements (a, b) where a is in `this` and b is in `other`.
673   */
674  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
675    new CartesianRDD(sc, this, other)
676  }
677
678  /**
679   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
680   * mapping to that key. The ordering of elements within each group is not guaranteed, and
681   * may even differ each time the resulting RDD is evaluated.
682   *
683   * @note This operation may be very expensive. If you are grouping in order to perform an
684   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
685   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
686   */
687  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
688    groupBy[K](f, defaultPartitioner(this))
689  }
690
691  /**
692   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
693   * mapping to that key. The ordering of elements within each group is not guaranteed, and
694   * may even differ each time the resulting RDD is evaluated.
695   *
696   * @note This operation may be very expensive. If you are grouping in order to perform an
697   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
698   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
699   */
700  def groupBy[K](
701      f: T => K,
702      numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
703    groupBy(f, new HashPartitioner(numPartitions))
704  }
705
706  /**
707   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
708   * mapping to that key. The ordering of elements within each group is not guaranteed, and
709   * may even differ each time the resulting RDD is evaluated.
710   *
711   * @note This operation may be very expensive. If you are grouping in order to perform an
712   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
713   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
714   */
715  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
716      : RDD[(K, Iterable[T])] = withScope {
717    val cleanF = sc.clean(f)
718    this.map(t => (cleanF(t), t)).groupByKey(p)
719  }
720
721  /**
722   * Return an RDD created by piping elements to a forked external process.
723   */
724  def pipe(command: String): RDD[String] = withScope {
725    // Similar to Runtime.exec(), if we are given a single string, split it into words
726    // using a standard StringTokenizer (i.e. by spaces)
727    pipe(PipedRDD.tokenize(command))
728  }
729
730  /**
731   * Return an RDD created by piping elements to a forked external process.
732   */
733  def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
734    // Similar to Runtime.exec(), if we are given a single string, split it into words
735    // using a standard StringTokenizer (i.e. by spaces)
736    pipe(PipedRDD.tokenize(command), env)
737  }
738
739  /**
740   * Return an RDD created by piping elements to a forked external process. The resulting RDD
741   * is computed by executing the given process once per partition. All elements
742   * of each input partition are written to a process's stdin as lines of input separated
743   * by a newline. The resulting partition consists of the process's stdout output, with
744   * each line of stdout resulting in one element of the output partition. A process is invoked
745   * even for empty partitions.
746   *
747   * The print behavior can be customized by providing two functions.
748   *
749   * @param command command to run in forked process.
750   * @param env environment variables to set.
751   * @param printPipeContext Before piping elements, this function is called as an opportunity
752   *                         to pipe context data. Print line function (like out.println) will be
753   *                         passed as printPipeContext's parameter.
754   * @param printRDDElement Use this function to customize how to pipe elements. This function
755   *                        will be called with each RDD element as the 1st parameter, and the
756   *                        print line function (like out.println()) as the 2nd parameter.
757   *                        An example of pipe the RDD data of groupBy() in a streaming way,
758   *                        instead of constructing a huge String to concat all the elements:
759   *                        {{{
760   *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
761   *                          for (e <- record._2) {f(e)}
762   *                        }}}
763   * @param separateWorkingDir Use separate working directories for each task.
764   * @param bufferSize Buffer size for the stdin writer for the piped process.
765   * @param encoding Char encoding used for interacting (via stdin, stdout and stderr) with
766   *                 the piped process
767   * @return the result RDD
768   */
769  def pipe(
770      command: Seq[String],
771      env: Map[String, String] = Map(),
772      printPipeContext: (String => Unit) => Unit = null,
773      printRDDElement: (T, String => Unit) => Unit = null,
774      separateWorkingDir: Boolean = false,
775      bufferSize: Int = 8192,
776      encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope {
777    new PipedRDD(this, command, env,
778      if (printPipeContext ne null) sc.clean(printPipeContext) else null,
779      if (printRDDElement ne null) sc.clean(printRDDElement) else null,
780      separateWorkingDir,
781      bufferSize,
782      encoding)
783  }
784
785  /**
786   * Return a new RDD by applying a function to each partition of this RDD.
787   *
788   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
789   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
790   */
791  def mapPartitions[U: ClassTag](
792      f: Iterator[T] => Iterator[U],
793      preservesPartitioning: Boolean = false): RDD[U] = withScope {
794    val cleanedF = sc.clean(f)
795    new MapPartitionsRDD(
796      this,
797      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
798      preservesPartitioning)
799  }
800
801  /**
802   * [performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning.
803   * It is a performance API to be used carefully only if we are sure that the RDD elements are
804   * serializable and don't require closure cleaning.
805   *
806   * @param preservesPartitioning indicates whether the input function preserves the partitioner,
807   * which should be `false` unless this is a pair RDD and the input function doesn't modify
808   * the keys.
809   */
810  private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
811      f: (Int, Iterator[T]) => Iterator[U],
812      preservesPartitioning: Boolean = false): RDD[U] = withScope {
813    new MapPartitionsRDD(
814      this,
815      (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
816      preservesPartitioning)
817  }
818
819  /**
820   * [performance] Spark's internal mapPartitions method that skips closure cleaning.
821   */
822  private[spark] def mapPartitionsInternal[U: ClassTag](
823      f: Iterator[T] => Iterator[U],
824      preservesPartitioning: Boolean = false): RDD[U] = withScope {
825    new MapPartitionsRDD(
826      this,
827      (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter),
828      preservesPartitioning)
829  }
830
831  /**
832   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
833   * of the original partition.
834   *
835   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
836   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
837   */
838  def mapPartitionsWithIndex[U: ClassTag](
839      f: (Int, Iterator[T]) => Iterator[U],
840      preservesPartitioning: Boolean = false): RDD[U] = withScope {
841    val cleanedF = sc.clean(f)
842    new MapPartitionsRDD(
843      this,
844      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
845      preservesPartitioning)
846  }
847
848  /**
849   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
850   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
851   * partitions* and the *same number of elements in each partition* (e.g. one was made through
852   * a map on the other).
853   */
854  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
855    zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
856      new Iterator[(T, U)] {
857        def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
858          case (true, true) => true
859          case (false, false) => false
860          case _ => throw new SparkException("Can only zip RDDs with " +
861            "same number of elements in each partition")
862        }
863        def next(): (T, U) = (thisIter.next(), otherIter.next())
864      }
865    }
866  }
867
868  /**
869   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
870   * applying a function to the zipped partitions. Assumes that all the RDDs have the
871   * *same number of partitions*, but does *not* require them to have the same number
872   * of elements in each partition.
873   */
874  def zipPartitions[B: ClassTag, V: ClassTag]
875      (rdd2: RDD[B], preservesPartitioning: Boolean)
876      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
877    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
878  }
879
880  def zipPartitions[B: ClassTag, V: ClassTag]
881      (rdd2: RDD[B])
882      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
883    zipPartitions(rdd2, preservesPartitioning = false)(f)
884  }
885
886  def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
887      (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
888      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
889    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
890  }
891
892  def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
893      (rdd2: RDD[B], rdd3: RDD[C])
894      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
895    zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
896  }
897
898  def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
899      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
900      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
901    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
902  }
903
904  def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
905      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
906      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
907    zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
908  }
909
910
911  // Actions (launch a job to return a value to the user program)
912
913  /**
914   * Applies a function f to all elements of this RDD.
915   */
916  def foreach(f: T => Unit): Unit = withScope {
917    val cleanF = sc.clean(f)
918    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
919  }
920
921  /**
922   * Applies a function f to each partition of this RDD.
923   */
924  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
925    val cleanF = sc.clean(f)
926    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
927  }
928
929  /**
930   * Return an array that contains all of the elements in this RDD.
931   *
932   * @note This method should only be used if the resulting array is expected to be small, as
933   * all the data is loaded into the driver's memory.
934   */
935  def collect(): Array[T] = withScope {
936    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
937    Array.concat(results: _*)
938  }
939
940  /**
941   * Return an iterator that contains all of the elements in this RDD.
942   *
943   * The iterator will consume as much memory as the largest partition in this RDD.
944   *
945   * @note This results in multiple Spark jobs, and if the input RDD is the result
946   * of a wide transformation (e.g. join with different partitioners), to avoid
947   * recomputing the input RDD should be cached first.
948   */
949  def toLocalIterator: Iterator[T] = withScope {
950    def collectPartition(p: Int): Array[T] = {
951      sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
952    }
953    (0 until partitions.length).iterator.flatMap(i => collectPartition(i))
954  }
955
956  /**
957   * Return an RDD that contains all matching values by applying `f`.
958   */
959  def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
960    val cleanF = sc.clean(f)
961    filter(cleanF.isDefinedAt).map(cleanF)
962  }
963
964  /**
965   * Return an RDD with the elements from `this` that are not in `other`.
966   *
967   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
968   * RDD will be &lt;= us.
969   */
970  def subtract(other: RDD[T]): RDD[T] = withScope {
971    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
972  }
973
974  /**
975   * Return an RDD with the elements from `this` that are not in `other`.
976   */
977  def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
978    subtract(other, new HashPartitioner(numPartitions))
979  }
980
981  /**
982   * Return an RDD with the elements from `this` that are not in `other`.
983   */
984  def subtract(
985      other: RDD[T],
986      p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
987    if (partitioner == Some(p)) {
988      // Our partitioner knows how to handle T (which, since we have a partitioner, is
989      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
990      val p2 = new Partitioner() {
991        override def numPartitions: Int = p.numPartitions
992        override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
993      }
994      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
995      // anyway, and when calling .keys, will not have a partitioner set, even though
996      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
997      // partitioned by the right/real keys (e.g. p).
998      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
999    } else {
1000      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
1001    }
1002  }
1003
1004  /**
1005   * Reduces the elements of this RDD using the specified commutative and
1006   * associative binary operator.
1007   */
1008  def reduce(f: (T, T) => T): T = withScope {
1009    val cleanF = sc.clean(f)
1010    val reducePartition: Iterator[T] => Option[T] = iter => {
1011      if (iter.hasNext) {
1012        Some(iter.reduceLeft(cleanF))
1013      } else {
1014        None
1015      }
1016    }
1017    var jobResult: Option[T] = None
1018    val mergeResult = (index: Int, taskResult: Option[T]) => {
1019      if (taskResult.isDefined) {
1020        jobResult = jobResult match {
1021          case Some(value) => Some(f(value, taskResult.get))
1022          case None => taskResult
1023        }
1024      }
1025    }
1026    sc.runJob(this, reducePartition, mergeResult)
1027    // Get the final result out of our Option, or throw an exception if the RDD was empty
1028    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
1029  }
1030
1031  /**
1032   * Reduces the elements of this RDD in a multi-level tree pattern.
1033   *
1034   * @param depth suggested depth of the tree (default: 2)
1035   * @see [[org.apache.spark.rdd.RDD#reduce]]
1036   */
1037  def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {
1038    require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
1039    val cleanF = context.clean(f)
1040    val reducePartition: Iterator[T] => Option[T] = iter => {
1041      if (iter.hasNext) {
1042        Some(iter.reduceLeft(cleanF))
1043      } else {
1044        None
1045      }
1046    }
1047    val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))
1048    val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
1049      if (c.isDefined && x.isDefined) {
1050        Some(cleanF(c.get, x.get))
1051      } else if (c.isDefined) {
1052        c
1053      } else if (x.isDefined) {
1054        x
1055      } else {
1056        None
1057      }
1058    }
1059    partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
1060      .getOrElse(throw new UnsupportedOperationException("empty collection"))
1061  }
1062
1063  /**
1064   * Aggregate the elements of each partition, and then the results for all the partitions, using a
1065   * given associative function and a neutral "zero value". The function
1066   * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
1067   * allocation; however, it should not modify t2.
1068   *
1069   * This behaves somewhat differently from fold operations implemented for non-distributed
1070   * collections in functional languages like Scala. This fold operation may be applied to
1071   * partitions individually, and then fold those results into the final result, rather than
1072   * apply the fold to each element sequentially in some defined ordering. For functions
1073   * that are not commutative, the result may differ from that of a fold applied to a
1074   * non-distributed collection.
1075   *
1076   * @param zeroValue the initial value for the accumulated result of each partition for the `op`
1077   *                  operator, and also the initial value for the combine results from different
1078   *                  partitions for the `op` operator - this will typically be the neutral
1079   *                  element (e.g. `Nil` for list concatenation or `0` for summation)
1080   * @param op an operator used to both accumulate results within a partition and combine results
1081   *                  from different partitions
1082   */
1083  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
1084    // Clone the zero value since we will also be serializing it as part of tasks
1085    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
1086    val cleanOp = sc.clean(op)
1087    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
1088    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
1089    sc.runJob(this, foldPartition, mergeResult)
1090    jobResult
1091  }
1092
1093  /**
1094   * Aggregate the elements of each partition, and then the results for all the partitions, using
1095   * given combine functions and a neutral "zero value". This function can return a different result
1096   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
1097   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
1098   * allowed to modify and return their first argument instead of creating a new U to avoid memory
1099   * allocation.
1100   *
1101   * @param zeroValue the initial value for the accumulated result of each partition for the
1102   *                  `seqOp` operator, and also the initial value for the combine results from
1103   *                  different partitions for the `combOp` operator - this will typically be the
1104   *                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
1105   * @param seqOp an operator used to accumulate results within a partition
1106   * @param combOp an associative operator used to combine results from different partitions
1107   */
1108  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
1109    // Clone the zero value since we will also be serializing it as part of tasks
1110    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
1111    val cleanSeqOp = sc.clean(seqOp)
1112    val cleanCombOp = sc.clean(combOp)
1113    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
1114    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
1115    sc.runJob(this, aggregatePartition, mergeResult)
1116    jobResult
1117  }
1118
1119  /**
1120   * Aggregates the elements of this RDD in a multi-level tree pattern.
1121   *
1122   * @param depth suggested depth of the tree (default: 2)
1123   * @see [[org.apache.spark.rdd.RDD#aggregate]]
1124   */
1125  def treeAggregate[U: ClassTag](zeroValue: U)(
1126      seqOp: (U, T) => U,
1127      combOp: (U, U) => U,
1128      depth: Int = 2): U = withScope {
1129    require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
1130    if (partitions.length == 0) {
1131      Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
1132    } else {
1133      val cleanSeqOp = context.clean(seqOp)
1134      val cleanCombOp = context.clean(combOp)
1135      val aggregatePartition =
1136        (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
1137      var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
1138      var numPartitions = partiallyAggregated.partitions.length
1139      val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
1140      // If creating an extra level doesn't help reduce
1141      // the wall-clock time, we stop tree aggregation.
1142
1143      // Don't trigger TreeAggregation when it doesn't save wall-clock time
1144      while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
1145        numPartitions /= scale
1146        val curNumPartitions = numPartitions
1147        partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
1148          (i, iter) => iter.map((i % curNumPartitions, _))
1149        }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
1150      }
1151      partiallyAggregated.reduce(cleanCombOp)
1152    }
1153  }
1154
1155  /**
1156   * Return the number of elements in the RDD.
1157   */
1158  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
1159
1160  /**
1161   * Approximate version of count() that returns a potentially incomplete result
1162   * within a timeout, even if not all tasks have finished.
1163   *
1164   * The confidence is the probability that the error bounds of the result will
1165   * contain the true value. That is, if countApprox were called repeatedly
1166   * with confidence 0.9, we would expect 90% of the results to contain the
1167   * true count. The confidence must be in the range [0,1] or an exception will
1168   * be thrown.
1169   *
1170   * @param timeout maximum time to wait for the job, in milliseconds
1171   * @param confidence the desired statistical confidence in the result
1172   * @return a potentially incomplete result, with error bounds
1173   */
1174  def countApprox(
1175      timeout: Long,
1176      confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
1177    require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
1178    val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
1179      var result = 0L
1180      while (iter.hasNext) {
1181        result += 1L
1182        iter.next()
1183      }
1184      result
1185    }
1186    val evaluator = new CountEvaluator(partitions.length, confidence)
1187    sc.runApproximateJob(this, countElements, evaluator, timeout)
1188  }
1189
1190  /**
1191   * Return the count of each unique value in this RDD as a local map of (value, count) pairs.
1192   *
1193   * @note This method should only be used if the resulting map is expected to be small, as
1194   * the whole thing is loaded into the driver's memory.
1195   * To handle very large results, consider using
1196   *
1197   * {{{
1198   * rdd.map(x => (x, 1L)).reduceByKey(_ + _)
1199   * }}}
1200   *
1201   * , which returns an RDD[T, Long] instead of a map.
1202   */
1203  def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
1204    map(value => (value, null)).countByKey()
1205  }
1206
1207  /**
1208   * Approximate version of countByValue().
1209   *
1210   * @param timeout maximum time to wait for the job, in milliseconds
1211   * @param confidence the desired statistical confidence in the result
1212   * @return a potentially incomplete result, with error bounds
1213   */
1214  def countByValueApprox(timeout: Long, confidence: Double = 0.95)
1215      (implicit ord: Ordering[T] = null)
1216      : PartialResult[Map[T, BoundedDouble]] = withScope {
1217    require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
1218    if (elementClassTag.runtimeClass.isArray) {
1219      throw new SparkException("countByValueApprox() does not support arrays")
1220    }
1221    val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) =>
1222      val map = new OpenHashMap[T, Long]
1223      iter.foreach {
1224        t => map.changeValue(t, 1L, _ + 1L)
1225      }
1226      map
1227    }
1228    val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence)
1229    sc.runApproximateJob(this, countPartition, evaluator, timeout)
1230  }
1231
1232  /**
1233   * Return approximate number of distinct elements in the RDD.
1234   *
1235   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
1236   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
1237   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
1238   *
1239   * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero (`sp` is greater
1240   * than `p`) would trigger sparse representation of registers, which may reduce the memory
1241   * consumption and increase accuracy when the cardinality is small.
1242   *
1243   * @param p The precision value for the normal set.
1244   *          `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
1245   * @param sp The precision value for the sparse set, between 0 and 32.
1246   *           If `sp` equals 0, the sparse representation is skipped.
1247   */
1248  def countApproxDistinct(p: Int, sp: Int): Long = withScope {
1249    require(p >= 4, s"p ($p) must be >= 4")
1250    require(sp <= 32, s"sp ($sp) must be <= 32")
1251    require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
1252    val zeroCounter = new HyperLogLogPlus(p, sp)
1253    aggregate(zeroCounter)(
1254      (hll: HyperLogLogPlus, v: T) => {
1255        hll.offer(v)
1256        hll
1257      },
1258      (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
1259        h1.addAll(h2)
1260        h1
1261      }).cardinality()
1262  }
1263
1264  /**
1265   * Return approximate number of distinct elements in the RDD.
1266   *
1267   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
1268   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
1269   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
1270   *
1271   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
1272   *                   It must be greater than 0.000017.
1273   */
1274  def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
1275    require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
1276    val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
1277    countApproxDistinct(if (p < 4) 4 else p, 0)
1278  }
1279
1280  /**
1281   * Zips this RDD with its element indices. The ordering is first based on the partition index
1282   * and then the ordering of items within each partition. So the first item in the first
1283   * partition gets index 0, and the last item in the last partition receives the largest index.
1284   *
1285   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
1286   * This method needs to trigger a spark job when this RDD contains more than one partitions.
1287   *
1288   * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
1289   * elements in a partition. The index assigned to each element is therefore not guaranteed,
1290   * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
1291   * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
1292   */
1293  def zipWithIndex(): RDD[(T, Long)] = withScope {
1294    new ZippedWithIndexRDD(this)
1295  }
1296
1297  /**
1298   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
1299   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
1300   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
1301   *
1302   * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
1303   * elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
1304   * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
1305   * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
1306   */
1307  def zipWithUniqueId(): RDD[(T, Long)] = withScope {
1308    val n = this.partitions.length.toLong
1309    this.mapPartitionsWithIndex { case (k, iter) =>
1310      Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
1311        (item, i * n + k)
1312      }
1313    }
1314  }
1315
1316  /**
1317   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
1318   * results from that partition to estimate the number of additional partitions needed to satisfy
1319   * the limit.
1320   *
1321   * @note This method should only be used if the resulting array is expected to be small, as
1322   * all the data is loaded into the driver's memory.
1323   *
1324   * @note Due to complications in the internal implementation, this method will raise
1325   * an exception if called on an RDD of `Nothing` or `Null`.
1326   */
1327  def take(num: Int): Array[T] = withScope {
1328    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
1329    if (num == 0) {
1330      new Array[T](0)
1331    } else {
1332      val buf = new ArrayBuffer[T]
1333      val totalParts = this.partitions.length
1334      var partsScanned = 0
1335      while (buf.size < num && partsScanned < totalParts) {
1336        // The number of partitions to try in this iteration. It is ok for this number to be
1337        // greater than totalParts because we actually cap it at totalParts in runJob.
1338        var numPartsToTry = 1L
1339        if (partsScanned > 0) {
1340          // If we didn't find any rows after the previous iteration, quadruple and retry.
1341          // Otherwise, interpolate the number of partitions we need to try, but overestimate
1342          // it by 50%. We also cap the estimation in the end.
1343          if (buf.isEmpty) {
1344            numPartsToTry = partsScanned * scaleUpFactor
1345          } else {
1346            // the left side of max is >=1 whenever partsScanned >= 2
1347            numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
1348            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
1349          }
1350        }
1351
1352        val left = num - buf.size
1353        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
1354        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
1355
1356        res.foreach(buf ++= _.take(num - buf.size))
1357        partsScanned += p.size
1358      }
1359
1360      buf.toArray
1361    }
1362  }
1363
1364  /**
1365   * Return the first element in this RDD.
1366   */
1367  def first(): T = withScope {
1368    take(1) match {
1369      case Array(t) => t
1370      case _ => throw new UnsupportedOperationException("empty collection")
1371    }
1372  }
1373
1374  /**
1375   * Returns the top k (largest) elements from this RDD as defined by the specified
1376   * implicit Ordering[T] and maintains the ordering. This does the opposite of
1377   * [[takeOrdered]]. For example:
1378   * {{{
1379   *   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
1380   *   // returns Array(12)
1381   *
1382   *   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
1383   *   // returns Array(6, 5)
1384   * }}}
1385   *
1386   * @note This method should only be used if the resulting array is expected to be small, as
1387   * all the data is loaded into the driver's memory.
1388   *
1389   * @param num k, the number of top elements to return
1390   * @param ord the implicit ordering for T
1391   * @return an array of top elements
1392   */
1393  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
1394    takeOrdered(num)(ord.reverse)
1395  }
1396
1397  /**
1398   * Returns the first k (smallest) elements from this RDD as defined by the specified
1399   * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
1400   * For example:
1401   * {{{
1402   *   sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
1403   *   // returns Array(2)
1404   *
1405   *   sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
1406   *   // returns Array(2, 3)
1407   * }}}
1408   *
1409   * @note This method should only be used if the resulting array is expected to be small, as
1410   * all the data is loaded into the driver's memory.
1411   *
1412   * @param num k, the number of elements to return
1413   * @param ord the implicit ordering for T
1414   * @return an array of top elements
1415   */
1416  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
1417    if (num == 0) {
1418      Array.empty
1419    } else {
1420      val mapRDDs = mapPartitions { items =>
1421        // Priority keeps the largest elements, so let's reverse the ordering.
1422        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
1423        queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
1424        Iterator.single(queue)
1425      }
1426      if (mapRDDs.partitions.length == 0) {
1427        Array.empty
1428      } else {
1429        mapRDDs.reduce { (queue1, queue2) =>
1430          queue1 ++= queue2
1431          queue1
1432        }.toArray.sorted(ord)
1433      }
1434    }
1435  }
1436
1437  /**
1438   * Returns the max of this RDD as defined by the implicit Ordering[T].
1439   * @return the maximum element of the RDD
1440   * */
1441  def max()(implicit ord: Ordering[T]): T = withScope {
1442    this.reduce(ord.max)
1443  }
1444
1445  /**
1446   * Returns the min of this RDD as defined by the implicit Ordering[T].
1447   * @return the minimum element of the RDD
1448   * */
1449  def min()(implicit ord: Ordering[T]): T = withScope {
1450    this.reduce(ord.min)
1451  }
1452
1453  /**
1454   * @note Due to complications in the internal implementation, this method will raise an
1455   * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
1456   * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
1457   * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
1458   * @return true if and only if the RDD contains no elements at all. Note that an RDD
1459   *         may be empty even when it has at least 1 partition.
1460   */
1461  def isEmpty(): Boolean = withScope {
1462    partitions.length == 0 || take(1).length == 0
1463  }
1464
1465  /**
1466   * Save this RDD as a text file, using string representations of elements.
1467   */
1468  def saveAsTextFile(path: String): Unit = withScope {
1469    // https://issues.apache.org/jira/browse/SPARK-2075
1470    //
1471    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
1472    // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
1473    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
1474    // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
1475    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
1476    //
1477    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
1478    // same bytecodes for `saveAsTextFile`.
1479    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
1480    val textClassTag = implicitly[ClassTag[Text]]
1481    val r = this.mapPartitions { iter =>
1482      val text = new Text()
1483      iter.map { x =>
1484        text.set(x.toString)
1485        (NullWritable.get(), text)
1486      }
1487    }
1488    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
1489      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
1490  }
1491
1492  /**
1493   * Save this RDD as a compressed text file, using string representations of elements.
1494   */
1495  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
1496    // https://issues.apache.org/jira/browse/SPARK-2075
1497    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
1498    val textClassTag = implicitly[ClassTag[Text]]
1499    val r = this.mapPartitions { iter =>
1500      val text = new Text()
1501      iter.map { x =>
1502        text.set(x.toString)
1503        (NullWritable.get(), text)
1504      }
1505    }
1506    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
1507      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
1508  }
1509
1510  /**
1511   * Save this RDD as a SequenceFile of serialized objects.
1512   */
1513  def saveAsObjectFile(path: String): Unit = withScope {
1514    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
1515      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
1516      .saveAsSequenceFile(path)
1517  }
1518
1519  /**
1520   * Creates tuples of the elements in this RDD by applying `f`.
1521   */
1522  def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
1523    val cleanedF = sc.clean(f)
1524    map(x => (cleanedF(x), x))
1525  }
1526
1527  /** A private method for tests, to look at the contents of each partition */
1528  private[spark] def collectPartitions(): Array[Array[T]] = withScope {
1529    sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
1530  }
1531
1532  /**
1533   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
1534   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
1535   * RDDs will be removed. This function must be called before any job has been
1536   * executed on this RDD. It is strongly recommended that this RDD is persisted in
1537   * memory, otherwise saving it on a file will require recomputation.
1538   */
1539  def checkpoint(): Unit = RDDCheckpointData.synchronized {
1540    // NOTE: we use a global lock here due to complexities downstream with ensuring
1541    // children RDD partitions point to the correct parent partitions. In the future
1542    // we should revisit this consideration.
1543    if (context.checkpointDir.isEmpty) {
1544      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
1545    } else if (checkpointData.isEmpty) {
1546      checkpointData = Some(new ReliableRDDCheckpointData(this))
1547    }
1548  }
1549
1550  /**
1551   * Mark this RDD for local checkpointing using Spark's existing caching layer.
1552   *
1553   * This method is for users who wish to truncate RDD lineages while skipping the expensive
1554   * step of replicating the materialized data in a reliable distributed file system. This is
1555   * useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
1556   *
1557   * Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
1558   * data is written to ephemeral local storage in the executors instead of to a reliable,
1559   * fault-tolerant storage. The effect is that if an executor fails during the computation,
1560   * the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
1561   *
1562   * This is NOT safe to use with dynamic allocation, which removes executors along
1563   * with their cached blocks. If you must use both features, you are advised to set
1564   * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
1565   *
1566   * The checkpoint directory set through `SparkContext#setCheckpointDir` is not used.
1567   */
1568  def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
1569    if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
1570        conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
1571      logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " +
1572        "which removes executors along with their cached blocks. If you must use both " +
1573        "features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " +
1574        "to a high value. E.g. If you plan to use the RDD for 1 hour, set the timeout to " +
1575        "at least 1 hour.")
1576    }
1577
1578    // Note: At this point we do not actually know whether the user will call persist() on
1579    // this RDD later, so we must explicitly call it here ourselves to ensure the cached
1580    // blocks are registered for cleanup later in the SparkContext.
1581    //
1582    // If, however, the user has already called persist() on this RDD, then we must adapt
1583    // the storage level he/she specified to one that is appropriate for local checkpointing
1584    // (i.e. uses disk) to guarantee correctness.
1585
1586    if (storageLevel == StorageLevel.NONE) {
1587      persist(LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL)
1588    } else {
1589      persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
1590    }
1591
1592    // If this RDD is already checkpointed and materialized, its lineage is already truncated.
1593    // We must not override our `checkpointData` in this case because it is needed to recover
1594    // the checkpointed data. If it is overridden, next time materializing on this RDD will
1595    // cause error.
1596    if (isCheckpointedAndMaterialized) {
1597      logWarning("Not marking RDD for local checkpoint because it was already " +
1598        "checkpointed and materialized")
1599    } else {
1600      // Lineage is not truncated yet, so just override any existing checkpoint data with ours
1601      checkpointData match {
1602        case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
1603          "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
1604        case _ =>
1605      }
1606      checkpointData = Some(new LocalRDDCheckpointData(this))
1607    }
1608    this
1609  }
1610
1611  /**
1612   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
1613   */
1614  def isCheckpointed: Boolean = isCheckpointedAndMaterialized
1615
1616  /**
1617   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
1618   * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
1619   * return value. Exposed for testing.
1620   */
1621  private[spark] def isCheckpointedAndMaterialized: Boolean =
1622    checkpointData.exists(_.isCheckpointed)
1623
1624  /**
1625   * Return whether this RDD is marked for local checkpointing.
1626   * Exposed for testing.
1627   */
1628  private[rdd] def isLocallyCheckpointed: Boolean = {
1629    checkpointData match {
1630      case Some(_: LocalRDDCheckpointData[T]) => true
1631      case _ => false
1632    }
1633  }
1634
1635  /**
1636   * Gets the name of the directory to which this RDD was checkpointed.
1637   * This is not defined if the RDD is checkpointed locally.
1638   */
1639  def getCheckpointFile: Option[String] = {
1640    checkpointData match {
1641      case Some(reliable: ReliableRDDCheckpointData[T]) => reliable.getCheckpointDir
1642      case _ => None
1643    }
1644  }
1645
1646  // =======================================================================
1647  // Other internal methods and fields
1648  // =======================================================================
1649
1650  private var storageLevel: StorageLevel = StorageLevel.NONE
1651
1652  /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1653  @transient private[spark] val creationSite = sc.getCallSite()
1654
1655  /**
1656   * The scope associated with the operation that created this RDD.
1657   *
1658   * This is more flexible than the call site and can be defined hierarchically. For more
1659   * detail, see the documentation of {{RDDOperationScope}}. This scope is not defined if the
1660   * user instantiates this RDD himself without using any Spark operations.
1661   */
1662  @transient private[spark] val scope: Option[RDDOperationScope] = {
1663    Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
1664  }
1665
1666  private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
1667
1668  private[spark] def elementClassTag: ClassTag[T] = classTag[T]
1669
1670  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
1671
1672  // Whether to checkpoint all ancestor RDDs that are marked for checkpointing. By default,
1673  // we stop as soon as we find the first such RDD, an optimization that allows us to write
1674  // less data but is not safe for all workloads. E.g. in streaming we may checkpoint both
1675  // an RDD and its parent in every batch, in which case the parent may never be checkpointed
1676  // and its lineage never truncated, leading to OOMs in the long run (SPARK-6847).
1677  private val checkpointAllMarkedAncestors =
1678    Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS))
1679      .map(_.toBoolean).getOrElse(false)
1680
1681  /** Returns the first parent RDD */
1682  protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
1683    dependencies.head.rdd.asInstanceOf[RDD[U]]
1684  }
1685
1686  /** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
1687  protected[spark] def parent[U: ClassTag](j: Int) = {
1688    dependencies(j).rdd.asInstanceOf[RDD[U]]
1689  }
1690
1691  /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
1692  def context: SparkContext = sc
1693
1694  /**
1695   * Private API for changing an RDD's ClassTag.
1696   * Used for internal Java-Scala API compatibility.
1697   */
1698  private[spark] def retag(cls: Class[T]): RDD[T] = {
1699    val classTag: ClassTag[T] = ClassTag.apply(cls)
1700    this.retag(classTag)
1701  }
1702
1703  /**
1704   * Private API for changing an RDD's ClassTag.
1705   * Used for internal Java-Scala API compatibility.
1706   */
1707  private[spark] def retag(implicit classTag: ClassTag[T]): RDD[T] = {
1708    this.mapPartitions(identity, preservesPartitioning = true)(classTag)
1709  }
1710
1711  // Avoid handling doCheckpoint multiple times to prevent excessive recursion
1712  @transient private var doCheckpointCalled = false
1713
1714  /**
1715   * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
1716   * has completed (therefore the RDD has been materialized and potentially stored in memory).
1717   * doCheckpoint() is called recursively on the parent RDDs.
1718   */
1719  private[spark] def doCheckpoint(): Unit = {
1720    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
1721      if (!doCheckpointCalled) {
1722        doCheckpointCalled = true
1723        if (checkpointData.isDefined) {
1724          if (checkpointAllMarkedAncestors) {
1725            // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
1726            // them in parallel.
1727            // Checkpoint parents first because our lineage will be truncated after we
1728            // checkpoint ourselves
1729            dependencies.foreach(_.rdd.doCheckpoint())
1730          }
1731          checkpointData.get.checkpoint()
1732        } else {
1733          dependencies.foreach(_.rdd.doCheckpoint())
1734        }
1735      }
1736    }
1737  }
1738
1739  /**
1740   * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
1741   * created from the checkpoint file, and forget its old dependencies and partitions.
1742   */
1743  private[spark] def markCheckpointed(): Unit = {
1744    clearDependencies()
1745    partitions_ = null
1746    deps = null    // Forget the constructor argument for dependencies too
1747  }
1748
1749  /**
1750   * Clears the dependencies of this RDD. This method must ensure that all references
1751   * to the original parent RDDs are removed to enable the parent RDDs to be garbage
1752   * collected. Subclasses of RDD may override this method for implementing their own cleaning
1753   * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
1754   */
1755  protected def clearDependencies() {
1756    dependencies_ = null
1757  }
1758
1759  /** A description of this RDD and its recursive dependencies for debugging. */
1760  def toDebugString: String = {
1761    // Get a debug description of an rdd without its children
1762    def debugSelf(rdd: RDD[_]): Seq[String] = {
1763      import Utils.bytesToString
1764
1765      val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
1766      val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info =>
1767        "    CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
1768          info.numCachedPartitions, bytesToString(info.memSize),
1769          bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))
1770
1771      s"$rdd [$persistence]" +: storageInfo
1772    }
1773
1774    // Apply a different rule to the last child
1775    def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
1776      val len = rdd.dependencies.length
1777      len match {
1778        case 0 => Seq.empty
1779        case 1 =>
1780          val d = rdd.dependencies.head
1781          debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]], true)
1782        case _ =>
1783          val frontDeps = rdd.dependencies.take(len - 1)
1784          val frontDepStrings = frontDeps.flatMap(
1785            d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]]))
1786
1787          val lastDep = rdd.dependencies.last
1788          val lastDepStrings =
1789            debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_, _, _]], true)
1790
1791          (frontDepStrings ++ lastDepStrings)
1792      }
1793    }
1794    // The first RDD in the dependency stack has no parents, so no need for a +-
1795    def firstDebugString(rdd: RDD[_]): Seq[String] = {
1796      val partitionStr = "(" + rdd.partitions.length + ")"
1797      val leftOffset = (partitionStr.length - 1) / 2
1798      val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
1799
1800      debugSelf(rdd).zipWithIndex.map{
1801        case (desc: String, 0) => s"$partitionStr $desc"
1802        case (desc: String, _) => s"$nextPrefix $desc"
1803      } ++ debugChildren(rdd, nextPrefix)
1804    }
1805    def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
1806      val partitionStr = "(" + rdd.partitions.length + ")"
1807      val leftOffset = (partitionStr.length - 1) / 2
1808      val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
1809      val nextPrefix = (
1810        thisPrefix
1811        + (if (isLastChild) "  " else "| ")
1812        + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
1813
1814      debugSelf(rdd).zipWithIndex.map{
1815        case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc"
1816        case (desc: String, _) => s"$nextPrefix$desc"
1817      } ++ debugChildren(rdd, nextPrefix)
1818    }
1819    def debugString(
1820        rdd: RDD[_],
1821        prefix: String = "",
1822        isShuffle: Boolean = true,
1823        isLastChild: Boolean = false): Seq[String] = {
1824      if (isShuffle) {
1825        shuffleDebugString(rdd, prefix, isLastChild)
1826      } else {
1827        debugSelf(rdd).map(prefix + _) ++ debugChildren(rdd, prefix)
1828      }
1829    }
1830    firstDebugString(this).mkString("\n")
1831  }
1832
1833  override def toString: String = "%s%s[%d] at %s".format(
1834    Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
1835
1836  def toJavaRDD() : JavaRDD[T] = {
1837    new JavaRDD(this)(elementClassTag)
1838  }
1839}
1840
1841
1842/**
1843 * Defines implicit functions that provide extra functionalities on RDDs of specific types.
1844 *
1845 * For example, [[RDD.rddToPairRDDFunctions]] converts an RDD into a [[PairRDDFunctions]] for
1846 * key-value-pair RDDs, and enabling extra functionalities such as [[PairRDDFunctions.reduceByKey]].
1847 */
1848object RDD {
1849
1850  private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS =
1851    "spark.checkpoint.checkpointAllMarkedAncestors"
1852
1853  // The following implicit functions were in SparkContext before 1.3 and users had to
1854  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
1855  // them automatically. However, we still keep the old functions in SparkContext for backward
1856  // compatibility and forward to the following functions directly.
1857
1858  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
1859    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
1860    new PairRDDFunctions(rdd)
1861  }
1862
1863  implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
1864    new AsyncRDDActions(rdd)
1865  }
1866
1867  implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
1868      (implicit kt: ClassTag[K], vt: ClassTag[V],
1869                keyWritableFactory: WritableFactory[K],
1870                valueWritableFactory: WritableFactory[V])
1871    : SequenceFileRDDFunctions[K, V] = {
1872    implicit val keyConverter = keyWritableFactory.convert
1873    implicit val valueConverter = valueWritableFactory.convert
1874    new SequenceFileRDDFunctions(rdd,
1875      keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
1876  }
1877
1878  implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
1879    : OrderedRDDFunctions[K, V, (K, V)] = {
1880    new OrderedRDDFunctions[K, V, (K, V)](rdd)
1881  }
1882
1883  implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = {
1884    new DoubleRDDFunctions(rdd)
1885  }
1886
1887  implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T])
1888    : DoubleRDDFunctions = {
1889    new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
1890  }
1891}
1892