1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.api.java
19
20import java.{lang => jl}
21import java.lang.{Iterable => JIterable}
22import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}
23
24import scala.collection.JavaConverters._
25import scala.reflect.ClassTag
26
27import org.apache.hadoop.io.compress.CompressionCodec
28
29import org.apache.spark._
30import org.apache.spark.annotation.Since
31import org.apache.spark.api.java.JavaPairRDD._
32import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
33import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
34import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _}
35import org.apache.spark.partial.{BoundedDouble, PartialResult}
36import org.apache.spark.rdd.RDD
37import org.apache.spark.storage.StorageLevel
38import org.apache.spark.util.Utils
39
40/**
41 * As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
42 * of JavaRDDLike should extend this dummy abstract class instead of directly inheriting
43 * from the trait. See SPARK-3266 for additional details.
44 */
45private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]]
46  extends JavaRDDLike[T, This]
47
48/**
49 * Defines operations common to several Java RDD implementations.
50 *
51 * @note This trait is not intended to be implemented by user code.
52 */
53trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
54  def wrapRDD(rdd: RDD[T]): This
55
56  implicit val classTag: ClassTag[T]
57
58  def rdd: RDD[T]
59
60  /** Set of partitions in this RDD. */
61  def partitions: JList[Partition] = rdd.partitions.toSeq.asJava
62
63  /** Return the number of partitions in this RDD. */
64  @Since("1.6.0")
65  def getNumPartitions: Int = rdd.getNumPartitions
66
67  /** The partitioner of this RDD. */
68  def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner)
69
70  /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
71  def context: SparkContext = rdd.context
72
73  /** A unique ID for this RDD (within its SparkContext). */
74  def id: Int = rdd.id
75
76  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
77  def getStorageLevel: StorageLevel = rdd.getStorageLevel
78
79  /**
80   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
81   * This should ''not'' be called by users directly, but is available for implementors of custom
82   * subclasses of RDD.
83   */
84  def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
85    rdd.iterator(split, taskContext).asJava
86
87  // Transformations (return a new RDD)
88
89  /**
90   * Return a new RDD by applying a function to all elements of this RDD.
91   */
92  def map[R](f: JFunction[T, R]): JavaRDD[R] =
93    new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
94
95  /**
96   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
97   * of the original partition.
98   */
99  def mapPartitionsWithIndex[R](
100      f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
101      preservesPartitioning: Boolean = false): JavaRDD[R] =
102    new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
103        preservesPartitioning)(fakeClassTag))(fakeClassTag)
104
105  /**
106   * Return a new RDD by applying a function to all elements of this RDD.
107   */
108  def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
109    new JavaDoubleRDD(rdd.map(f.call(_).doubleValue()))
110  }
111
112  /**
113   * Return a new RDD by applying a function to all elements of this RDD.
114   */
115  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
116    def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
117    new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
118  }
119
120  /**
121   *  Return a new RDD by first applying a function to all elements of this
122   *  RDD, and then flattening the results.
123   */
124  def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
125    def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
126    JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
127  }
128
129  /**
130   *  Return a new RDD by first applying a function to all elements of this
131   *  RDD, and then flattening the results.
132   */
133  def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
134    def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
135    new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue()))
136  }
137
138  /**
139   *  Return a new RDD by first applying a function to all elements of this
140   *  RDD, and then flattening the results.
141   */
142  def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
143    def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
144    def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
145    JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
146  }
147
148  /**
149   * Return a new RDD by applying a function to each partition of this RDD.
150   */
151  def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
152    def fn: (Iterator[T]) => Iterator[U] = {
153      (x: Iterator[T]) => f.call(x.asJava).asScala
154    }
155    JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
156  }
157
158  /**
159   * Return a new RDD by applying a function to each partition of this RDD.
160   */
161  def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
162      preservesPartitioning: Boolean): JavaRDD[U] = {
163    def fn: (Iterator[T]) => Iterator[U] = {
164      (x: Iterator[T]) => f.call(x.asJava).asScala
165    }
166    JavaRDD.fromRDD(
167      rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
168  }
169
170  /**
171   * Return a new RDD by applying a function to each partition of this RDD.
172   */
173  def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = {
174    def fn: (Iterator[T]) => Iterator[jl.Double] = {
175      (x: Iterator[T]) => f.call(x.asJava).asScala
176    }
177    new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue()))
178  }
179
180  /**
181   * Return a new RDD by applying a function to each partition of this RDD.
182   */
183  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]):
184  JavaPairRDD[K2, V2] = {
185    def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
186      (x: Iterator[T]) => f.call(x.asJava).asScala
187    }
188    JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
189  }
190
191  /**
192   * Return a new RDD by applying a function to each partition of this RDD.
193   */
194  def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
195      preservesPartitioning: Boolean): JavaDoubleRDD = {
196    def fn: (Iterator[T]) => Iterator[jl.Double] = {
197      (x: Iterator[T]) => f.call(x.asJava).asScala
198    }
199    new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
200      .map(_.doubleValue()))
201  }
202
203  /**
204   * Return a new RDD by applying a function to each partition of this RDD.
205   */
206  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
207      preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
208    def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
209      (x: Iterator[T]) => f.call(x.asJava).asScala
210    }
211    JavaPairRDD.fromRDD(
212      rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
213  }
214
215  /**
216   * Applies a function f to each partition of this RDD.
217   */
218  def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
219    rdd.foreachPartition(x => f.call(x.asJava))
220  }
221
222  /**
223   * Return an RDD created by coalescing all elements within each partition into an array.
224   */
225  def glom(): JavaRDD[JList[T]] =
226    new JavaRDD(rdd.glom().map(_.toSeq.asJava))
227
228  /**
229   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
230   * elements (a, b) where a is in `this` and b is in `other`.
231   */
232  def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] =
233    JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classTag))(classTag, other.classTag)
234
235  /**
236   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
237   * mapping to that key.
238   */
239  def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
240    // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
241    implicit val ctagK: ClassTag[U] = fakeClassTag
242    implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
243    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
244  }
245
246  /**
247   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
248   * mapping to that key.
249   */
250  def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
251    // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
252    implicit val ctagK: ClassTag[U] = fakeClassTag
253    implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
254    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
255  }
256
257  /**
258   * Return an RDD created by piping elements to a forked external process.
259   */
260  def pipe(command: String): JavaRDD[String] = {
261    rdd.pipe(command)
262  }
263
264  /**
265   * Return an RDD created by piping elements to a forked external process.
266   */
267  def pipe(command: JList[String]): JavaRDD[String] = {
268    rdd.pipe(command.asScala)
269  }
270
271  /**
272   * Return an RDD created by piping elements to a forked external process.
273   */
274  def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
275    rdd.pipe(command.asScala, env.asScala)
276  }
277
278  /**
279   * Return an RDD created by piping elements to a forked external process.
280   */
281  def pipe(command: JList[String],
282           env: JMap[String, String],
283           separateWorkingDir: Boolean,
284           bufferSize: Int): JavaRDD[String] = {
285    rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
286  }
287
288  /**
289   * Return an RDD created by piping elements to a forked external process.
290   */
291  def pipe(command: JList[String],
292           env: JMap[String, String],
293           separateWorkingDir: Boolean,
294           bufferSize: Int,
295           encoding: String): JavaRDD[String] = {
296    rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
297  }
298
299  /**
300   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
301   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
302   * partitions* and the *same number of elements in each partition* (e.g. one was made through
303   * a map on the other).
304   */
305  def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
306    JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag)
307  }
308
309  /**
310   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
311   * applying a function to the zipped partitions. Assumes that all the RDDs have the
312   * *same number of partitions*, but does *not* require them to have the same number
313   * of elements in each partition.
314   */
315  def zipPartitions[U, V](
316      other: JavaRDDLike[U, _],
317      f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
318    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
319      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
320    }
321    JavaRDD.fromRDD(
322      rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
323  }
324
325  /**
326   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
327   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
328   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
329   */
330  def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = {
331    JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]]
332  }
333
334  /**
335   * Zips this RDD with its element indices. The ordering is first based on the partition index
336   * and then the ordering of items within each partition. So the first item in the first
337   * partition gets index 0, and the last item in the last partition receives the largest index.
338   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
339   * This method needs to trigger a spark job when this RDD contains more than one partitions.
340   */
341  def zipWithIndex(): JavaPairRDD[T, jl.Long] = {
342    JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]]
343  }
344
345  // Actions (launch a job to return a value to the user program)
346
347  /**
348   * Applies a function f to all elements of this RDD.
349   */
350  def foreach(f: VoidFunction[T]) {
351    rdd.foreach(x => f.call(x))
352  }
353
354  /**
355   * Return an array that contains all of the elements in this RDD.
356   *
357   * @note this method should only be used if the resulting array is expected to be small, as
358   * all the data is loaded into the driver's memory.
359   */
360  def collect(): JList[T] =
361    rdd.collect().toSeq.asJava
362
363  /**
364   * Return an iterator that contains all of the elements in this RDD.
365   *
366   * The iterator will consume as much memory as the largest partition in this RDD.
367   */
368  def toLocalIterator(): JIterator[T] =
369     asJavaIteratorConverter(rdd.toLocalIterator).asJava
370
371  /**
372   * Return an array that contains all of the elements in a specific partition of this RDD.
373   */
374  def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
375    // This is useful for implementing `take` from other language frontends
376    // like Python where the data is serialized.
377    val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds)
378    res.map(_.toSeq.asJava)
379  }
380
381  /**
382   * Reduces the elements of this RDD using the specified commutative and associative binary
383   * operator.
384   */
385  def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
386
387  /**
388   * Reduces the elements of this RDD in a multi-level tree pattern.
389   *
390   * @param depth suggested depth of the tree
391   * @see [[org.apache.spark.api.java.JavaRDDLike#reduce]]
392   */
393  def treeReduce(f: JFunction2[T, T, T], depth: Int): T = rdd.treeReduce(f, depth)
394
395  /**
396   * [[org.apache.spark.api.java.JavaRDDLike#treeReduce]] with suggested depth 2.
397   */
398  def treeReduce(f: JFunction2[T, T, T]): T = treeReduce(f, 2)
399
400  /**
401   * Aggregate the elements of each partition, and then the results for all the partitions, using a
402   * given associative function and a neutral "zero value". The function
403   * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
404   * allocation; however, it should not modify t2.
405   *
406   * This behaves somewhat differently from fold operations implemented for non-distributed
407   * collections in functional languages like Scala. This fold operation may be applied to
408   * partitions individually, and then fold those results into the final result, rather than
409   * apply the fold to each element sequentially in some defined ordering. For functions
410   * that are not commutative, the result may differ from that of a fold applied to a
411   * non-distributed collection.
412   */
413  def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
414    rdd.fold(zeroValue)(f)
415
416  /**
417   * Aggregate the elements of each partition, and then the results for all the partitions, using
418   * given combine functions and a neutral "zero value". This function can return a different result
419   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
420   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
421   * allowed to modify and return their first argument instead of creating a new U to avoid memory
422   * allocation.
423   */
424  def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
425    combOp: JFunction2[U, U, U]): U =
426    rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])
427
428  /**
429   * Aggregates the elements of this RDD in a multi-level tree pattern.
430   *
431   * @param depth suggested depth of the tree
432   * @see [[org.apache.spark.api.java.JavaRDDLike#aggregate]]
433   */
434  def treeAggregate[U](
435      zeroValue: U,
436      seqOp: JFunction2[U, T, U],
437      combOp: JFunction2[U, U, U],
438      depth: Int): U = {
439    rdd.treeAggregate(zeroValue)(seqOp, combOp, depth)(fakeClassTag[U])
440  }
441
442  /**
443   * [[org.apache.spark.api.java.JavaRDDLike#treeAggregate]] with suggested depth 2.
444   */
445  def treeAggregate[U](
446      zeroValue: U,
447      seqOp: JFunction2[U, T, U],
448      combOp: JFunction2[U, U, U]): U = {
449    treeAggregate(zeroValue, seqOp, combOp, 2)
450  }
451
452  /**
453   * Return the number of elements in the RDD.
454   */
455  def count(): Long = rdd.count()
456
457  /**
458   * Approximate version of count() that returns a potentially incomplete result
459   * within a timeout, even if not all tasks have finished.
460   *
461   * The confidence is the probability that the error bounds of the result will
462   * contain the true value. That is, if countApprox were called repeatedly
463   * with confidence 0.9, we would expect 90% of the results to contain the
464   * true count. The confidence must be in the range [0,1] or an exception will
465   * be thrown.
466   *
467   * @param timeout maximum time to wait for the job, in milliseconds
468   * @param confidence the desired statistical confidence in the result
469   * @return a potentially incomplete result, with error bounds
470   */
471  def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
472    rdd.countApprox(timeout, confidence)
473
474  /**
475   * Approximate version of count() that returns a potentially incomplete result
476   * within a timeout, even if not all tasks have finished.
477   *
478   * @param timeout maximum time to wait for the job, in milliseconds
479   */
480  def countApprox(timeout: Long): PartialResult[BoundedDouble] =
481    rdd.countApprox(timeout)
482
483  /**
484   * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
485   * combine step happens locally on the master, equivalent to running a single reduce task.
486   */
487  def countByValue(): JMap[T, jl.Long] =
488    mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
489
490  /**
491   * Approximate version of countByValue().
492   *
493   * The confidence is the probability that the error bounds of the result will
494   * contain the true value. That is, if countApprox were called repeatedly
495   * with confidence 0.9, we would expect 90% of the results to contain the
496   * true count. The confidence must be in the range [0,1] or an exception will
497   * be thrown.
498   *
499   * @param timeout maximum time to wait for the job, in milliseconds
500   * @param confidence the desired statistical confidence in the result
501   * @return a potentially incomplete result, with error bounds
502   */
503  def countByValueApprox(
504    timeout: Long,
505    confidence: Double
506    ): PartialResult[JMap[T, BoundedDouble]] =
507    rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
508
509  /**
510   * Approximate version of countByValue().
511   *
512   * @param timeout maximum time to wait for the job, in milliseconds
513   * @return a potentially incomplete result, with error bounds
514   */
515  def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
516    rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
517
518  /**
519   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
520   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
521   * whole RDD instead.
522   *
523   * @note this method should only be used if the resulting array is expected to be small, as
524   * all the data is loaded into the driver's memory.
525   */
526  def take(num: Int): JList[T] =
527    rdd.take(num).toSeq.asJava
528
529  def takeSample(withReplacement: Boolean, num: Int): JList[T] =
530    takeSample(withReplacement, num, Utils.random.nextLong)
531
532  def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] =
533    rdd.takeSample(withReplacement, num, seed).toSeq.asJava
534
535  /**
536   * Return the first element in this RDD.
537   */
538  def first(): T = rdd.first()
539
540  /**
541   * @return true if and only if the RDD contains no elements at all. Note that an RDD
542   *         may be empty even when it has at least 1 partition.
543   */
544  def isEmpty(): Boolean = rdd.isEmpty()
545
546  /**
547   * Save this RDD as a text file, using string representations of elements.
548   */
549  def saveAsTextFile(path: String): Unit = {
550    rdd.saveAsTextFile(path)
551  }
552
553
554  /**
555   * Save this RDD as a compressed text file, using string representations of elements.
556   */
557  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
558    rdd.saveAsTextFile(path, codec)
559  }
560
561  /**
562   * Save this RDD as a SequenceFile of serialized objects.
563   */
564  def saveAsObjectFile(path: String): Unit = {
565    rdd.saveAsObjectFile(path)
566  }
567
568  /**
569   * Creates tuples of the elements in this RDD by applying `f`.
570   */
571  def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
572    // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
573    implicit val ctag: ClassTag[U] = fakeClassTag
574    JavaPairRDD.fromRDD(rdd.keyBy(f))
575  }
576
577  /**
578   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
579   * directory set with SparkContext.setCheckpointDir() and all references to its parent
580   * RDDs will be removed. This function must be called before any job has been
581   * executed on this RDD. It is strongly recommended that this RDD is persisted in
582   * memory, otherwise saving it on a file will require recomputation.
583   */
584  def checkpoint(): Unit = {
585    rdd.checkpoint()
586  }
587
588  /**
589   * Return whether this RDD has been checkpointed or not
590   */
591  def isCheckpointed: Boolean = rdd.isCheckpointed
592
593  /**
594   * Gets the name of the file to which this RDD was checkpointed
595   */
596  def getCheckpointFile(): Optional[String] = {
597    JavaUtils.optionToOptional(rdd.getCheckpointFile)
598  }
599
600  /** A description of this RDD and its recursive dependencies for debugging. */
601  def toDebugString(): String = {
602    rdd.toDebugString
603  }
604
605  /**
606   * Returns the top k (largest) elements from this RDD as defined by
607   * the specified Comparator[T] and maintains the order.
608   *
609   * @note this method should only be used if the resulting array is expected to be small, as
610   * all the data is loaded into the driver's memory.
611   * @param num k, the number of top elements to return
612   * @param comp the comparator that defines the order
613   * @return an array of top elements
614   */
615  def top(num: Int, comp: Comparator[T]): JList[T] = {
616    rdd.top(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava
617  }
618
619  /**
620   * Returns the top k (largest) elements from this RDD using the
621   * natural ordering for T and maintains the order.
622   *
623   * @note this method should only be used if the resulting array is expected to be small, as
624   * all the data is loaded into the driver's memory.
625   * @param num k, the number of top elements to return
626   * @return an array of top elements
627   */
628  def top(num: Int): JList[T] = {
629    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
630    top(num, comp)
631  }
632
633  /**
634   * Returns the first k (smallest) elements from this RDD as defined by
635   * the specified Comparator[T] and maintains the order.
636   *
637   * @note this method should only be used if the resulting array is expected to be small, as
638   * all the data is loaded into the driver's memory.
639   * @param num k, the number of elements to return
640   * @param comp the comparator that defines the order
641   * @return an array of top elements
642   */
643  def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
644    rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava
645  }
646
647  /**
648   * Returns the maximum element from this RDD as defined by the specified
649   * Comparator[T].
650   *
651   * @param comp the comparator that defines ordering
652   * @return the maximum of the RDD
653   */
654  def max(comp: Comparator[T]): T = {
655    rdd.max()(Ordering.comparatorToOrdering(comp))
656  }
657
658  /**
659   * Returns the minimum element from this RDD as defined by the specified
660   * Comparator[T].
661   *
662   * @param comp the comparator that defines ordering
663   * @return the minimum of the RDD
664   */
665  def min(comp: Comparator[T]): T = {
666    rdd.min()(Ordering.comparatorToOrdering(comp))
667  }
668
669  /**
670   * Returns the first k (smallest) elements from this RDD using the
671   * natural ordering for T while maintain the order.
672   *
673   * @note this method should only be used if the resulting array is expected to be small, as
674   * all the data is loaded into the driver's memory.
675   * @param num k, the number of top elements to return
676   * @return an array of top elements
677   */
678  def takeOrdered(num: Int): JList[T] = {
679    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
680    takeOrdered(num, comp)
681  }
682
683  /**
684   * Return approximate number of distinct elements in the RDD.
685   *
686   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
687   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
688   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
689   *
690   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
691   *                   It must be greater than 0.000017.
692   */
693  def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
694
695  def name(): String = rdd.name
696
697  /**
698   * The asynchronous version of `count`, which returns a
699   * future for counting the number of elements in this RDD.
700   */
701  def countAsync(): JavaFutureAction[jl.Long] = {
702    new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf)
703  }
704
705  /**
706   * The asynchronous version of `collect`, which returns a future for
707   * retrieving an array containing all of the elements in this RDD.
708   *
709   * @note this method should only be used if the resulting array is expected to be small, as
710   * all the data is loaded into the driver's memory.
711   */
712  def collectAsync(): JavaFutureAction[JList[T]] = {
713    new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
714  }
715
716  /**
717   * The asynchronous version of the `take` action, which returns a
718   * future for retrieving the first `num` elements of this RDD.
719   *
720   * @note this method should only be used if the resulting array is expected to be small, as
721   * all the data is loaded into the driver's memory.
722   */
723  def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
724    new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
725  }
726
727  /**
728   * The asynchronous version of the `foreach` action, which
729   * applies a function f to all the elements of this RDD.
730   */
731  def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
732    new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
733      { x => null.asInstanceOf[Void] })
734  }
735
736  /**
737   * The asynchronous version of the `foreachPartition` action, which
738   * applies a function f to each partition of this RDD.
739   */
740  def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = {
741    new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
742      { x => null.asInstanceOf[Void] })
743  }
744}
745