1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.api.java
19
20import java.{lang => jl}
21import java.lang.{Iterable => JIterable}
22import java.util.{Comparator, List => JList}
23
24import scala.collection.JavaConverters._
25import scala.language.implicitConversions
26import scala.reflect.ClassTag
27
28import org.apache.hadoop.conf.Configuration
29import org.apache.hadoop.io.compress.CompressionCodec
30import org.apache.hadoop.mapred.{JobConf, OutputFormat}
31import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
32
33import org.apache.spark.{HashPartitioner, Partitioner}
34import org.apache.spark.Partitioner._
35import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
36import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
37import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
38import org.apache.spark.partial.{BoundedDouble, PartialResult}
39import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
40import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
41import org.apache.spark.serializer.Serializer
42import org.apache.spark.storage.StorageLevel
43import org.apache.spark.util.Utils
44
45class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
46                       (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
47  extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {
48
49  override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
50
51  override val classTag: ClassTag[(K, V)] = rdd.elementClassTag
52
53  import JavaPairRDD._
54
55  // Common RDD functions
56
57  /**
58   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
59   */
60  def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
61
62  /**
63   * Set this RDD's storage level to persist its values across operations after the first time
64   * it is computed. Can only be called once on each RDD.
65   */
66  def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
67    new JavaPairRDD[K, V](rdd.persist(newLevel))
68
69  /**
70   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
71   * This method blocks until all blocks are deleted.
72   */
73  def unpersist(): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist())
74
75  /**
76   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
77   *
78   * @param blocking Whether to block until all blocks are deleted.
79   */
80  def unpersist(blocking: Boolean): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist(blocking))
81
82  // Transformations (return a new RDD)
83
84  /**
85   * Return a new RDD containing the distinct elements in this RDD.
86   */
87  def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
88
89  /**
90   * Return a new RDD containing the distinct elements in this RDD.
91   */
92  def distinct(numPartitions: Int): JavaPairRDD[K, V] =
93      new JavaPairRDD[K, V](rdd.distinct(numPartitions))
94
95  /**
96   * Return a new RDD containing only the elements that satisfy a predicate.
97   */
98  def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
99    new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))
100
101  /**
102   * Return a new RDD that is reduced into `numPartitions` partitions.
103   */
104  def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
105
106  /**
107   * Return a new RDD that is reduced into `numPartitions` partitions.
108   */
109  def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
110    fromRDD(rdd.coalesce(numPartitions, shuffle))
111
112  /**
113   * Return a new RDD that has exactly numPartitions partitions.
114   *
115   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
116   * a shuffle to redistribute data.
117   *
118   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
119   * which can avoid performing a shuffle.
120   */
121  def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions))
122
123  /**
124   * Return a sampled subset of this RDD.
125   */
126  def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] =
127    sample(withReplacement, fraction, Utils.random.nextLong)
128
129  /**
130   * Return a sampled subset of this RDD.
131   */
132  def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] =
133    new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))
134
135  /**
136   * Return a subset of this RDD sampled by key (via stratified sampling).
137   *
138   * Create a sample of this RDD using variable sampling rates for different keys as specified by
139   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
140   * RDD, to produce a sample of size that's approximately equal to the sum of
141   * math.ceil(numItems * samplingRate) over all key values.
142   */
143  def sampleByKey(withReplacement: Boolean,
144      fractions: java.util.Map[K, jl.Double],
145      seed: Long): JavaPairRDD[K, V] =
146    new JavaPairRDD[K, V](rdd.sampleByKey(
147      withReplacement,
148      fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize
149      seed))
150
151  /**
152   * Return a subset of this RDD sampled by key (via stratified sampling).
153   *
154   * Create a sample of this RDD using variable sampling rates for different keys as specified by
155   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
156   * RDD, to produce a sample of size that's approximately equal to the sum of
157   * math.ceil(numItems * samplingRate) over all key values.
158   *
159   * Use Utils.random.nextLong as the default seed for the random number generator.
160   */
161  def sampleByKey(withReplacement: Boolean,
162      fractions: java.util.Map[K, jl.Double]): JavaPairRDD[K, V] =
163    sampleByKey(withReplacement, fractions, Utils.random.nextLong)
164
165  /**
166   * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
167   * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
168   *
169   * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
170   * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
171   * over all key values with a 99.99% confidence. When sampling without replacement, we need one
172   * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
173   * two additional passes.
174   */
175  def sampleByKeyExact(withReplacement: Boolean,
176      fractions: java.util.Map[K, jl.Double],
177      seed: Long): JavaPairRDD[K, V] =
178    new JavaPairRDD[K, V](rdd.sampleByKeyExact(
179      withReplacement,
180      fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize
181      seed))
182
183  /**
184   * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
185   * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
186   *
187   * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
188   * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
189   * over all key values with a 99.99% confidence. When sampling without replacement, we need one
190   * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
191   * two additional passes.
192   *
193   * Use Utils.random.nextLong as the default seed for the random number generator.
194   */
195  def sampleByKeyExact(
196      withReplacement: Boolean,
197      fractions: java.util.Map[K, jl.Double]): JavaPairRDD[K, V] =
198    sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong)
199
200  /**
201   * Return the union of this RDD and another one. Any identical elements will appear multiple
202   * times (use `.distinct()` to eliminate them).
203   */
204  def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
205    new JavaPairRDD[K, V](rdd.union(other.rdd))
206
207  /**
208   * Return the intersection of this RDD and another one. The output will not contain any duplicate
209   * elements, even if the input RDDs did.
210   *
211   * @note This method performs a shuffle internally.
212   */
213  def intersection(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
214    new JavaPairRDD[K, V](rdd.intersection(other.rdd))
215
216
217  // first() has to be overridden here so that the generated method has the signature
218  // 'public scala.Tuple2 first()'; if the trait's definition is used,
219  // then the method has the signature 'public java.lang.Object first()',
220  // causing NoSuchMethodErrors at runtime.
221  override def first(): (K, V) = rdd.first()
222
223  // Pair RDD functions
224
225  /**
226   * Generic function to combine the elements for each key using a custom set of aggregation
227   * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
228   * "combined type" C.
229   *
230   * Users provide three functions:
231   *
232   *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
233   *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
234   *  - `mergeCombiners`, to combine two C's into a single one.
235   *
236   * In addition, users can control the partitioning of the output RDD, the serializer that is use
237   * for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple
238   * items with the same key).
239   *
240   * @note V and C can be different -- for example, one might group an RDD of type (Int, Int) into
241   * an RDD of type (Int, List[Int]).
242   */
243  def combineByKey[C](createCombiner: JFunction[V, C],
244      mergeValue: JFunction2[C, V, C],
245      mergeCombiners: JFunction2[C, C, C],
246      partitioner: Partitioner,
247      mapSideCombine: Boolean,
248      serializer: Serializer): JavaPairRDD[K, C] = {
249      implicit val ctag: ClassTag[C] = fakeClassTag
250    fromRDD(rdd.combineByKeyWithClassTag(
251      createCombiner,
252      mergeValue,
253      mergeCombiners,
254      partitioner,
255      mapSideCombine,
256      serializer
257    ))
258  }
259
260  /**
261   * Generic function to combine the elements for each key using a custom set of aggregation
262   * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
263   * "combined type" C.
264   *
265   * Users provide three functions:
266   *
267   *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
268   *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
269   *  - `mergeCombiners`, to combine two C's into a single one.
270   *
271   * In addition, users can control the partitioning of the output RDD. This method automatically
272   * uses map-side aggregation in shuffling the RDD.
273   *
274   * @note V and C can be different -- for example, one might group an RDD of type (Int, Int) into
275   * an RDD of type (Int, List[Int]).
276   */
277  def combineByKey[C](createCombiner: JFunction[V, C],
278      mergeValue: JFunction2[C, V, C],
279      mergeCombiners: JFunction2[C, C, C],
280      partitioner: Partitioner): JavaPairRDD[K, C] = {
281    combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, true, null)
282  }
283
284  /**
285   * Simplified version of combineByKey that hash-partitions the output RDD and uses map-side
286   * aggregation.
287   */
288  def combineByKey[C](createCombiner: JFunction[V, C],
289      mergeValue: JFunction2[C, V, C],
290      mergeCombiners: JFunction2[C, C, C],
291      numPartitions: Int): JavaPairRDD[K, C] =
292    combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
293
294  /**
295   * Merge the values for each key using an associative and commutative reduce function. This will
296   * also perform the merging locally on each mapper before sending results to a reducer, similarly
297   * to a "combiner" in MapReduce.
298   */
299  def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
300    fromRDD(rdd.reduceByKey(partitioner, func))
301
302  /**
303   * Merge the values for each key using an associative and commutative reduce function, but return
304   * the result immediately to the master as a Map. This will also perform the merging locally on
305   * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
306   */
307  def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
308    mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
309
310  /** Count the number of elements for each key, and return the result to the master as a Map. */
311  def countByKey(): java.util.Map[K, jl.Long] =
312    mapAsSerializableJavaMap(rdd.countByKey()).asInstanceOf[java.util.Map[K, jl.Long]]
313
314  /**
315   * Approximate version of countByKey that can return a partial result if it does
316   * not finish within a timeout.
317   */
318  def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
319    rdd.countByKeyApprox(timeout).map(mapAsSerializableJavaMap)
320
321  /**
322   * Approximate version of countByKey that can return a partial result if it does
323   * not finish within a timeout.
324   */
325  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
326  : PartialResult[java.util.Map[K, BoundedDouble]] =
327    rdd.countByKeyApprox(timeout, confidence).map(mapAsSerializableJavaMap)
328
329  /**
330   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
331   * This function can return a different result type, U, than the type of the values in this RDD,
332   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
333   * as in scala.TraversableOnce. The former operation is used for merging values within a
334   * partition, and the latter is used for merging values between partitions. To avoid memory
335   * allocation, both of these functions are allowed to modify and return their first argument
336   * instead of creating a new U.
337   */
338  def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],
339      combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
340    implicit val ctag: ClassTag[U] = fakeClassTag
341    fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc))
342  }
343
344  /**
345   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
346   * This function can return a different result type, U, than the type of the values in this RDD,
347   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
348   * as in scala.TraversableOnce. The former operation is used for merging values within a
349   * partition, and the latter is used for merging values between partitions. To avoid memory
350   * allocation, both of these functions are allowed to modify and return their first argument
351   * instead of creating a new U.
352   */
353  def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],
354      combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
355    implicit val ctag: ClassTag[U] = fakeClassTag
356    fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc))
357  }
358
359  /**
360   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
361   * This function can return a different result type, U, than the type of the values in this RDD,
362   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's.
363   * The former operation is used for merging values within a partition, and the latter is used for
364   * merging values between partitions. To avoid memory allocation, both of these functions are
365   * allowed to modify and return their first argument instead of creating a new U.
366   */
367  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]):
368      JavaPairRDD[K, U] = {
369    implicit val ctag: ClassTag[U] = fakeClassTag
370    fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
371  }
372
373  /**
374   * Merge the values for each key using an associative function and a neutral "zero value" which
375   * may be added to the result an arbitrary number of times, and must not change the result
376   * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
377   */
378  def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V])
379  : JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
380
381  /**
382   * Merge the values for each key using an associative function and a neutral "zero value" which
383   * may be added to the result an arbitrary number of times, and must not change the result
384   * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
385   */
386  def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
387    fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
388
389  /**
390   * Merge the values for each key using an associative function and a neutral "zero value"
391   * which may be added to the result an arbitrary number of times, and must not change the result
392   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
393   */
394  def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
395    fromRDD(rdd.foldByKey(zeroValue)(func))
396
397  /**
398   * Merge the values for each key using an associative and commutative reduce function. This will
399   * also perform the merging locally on each mapper before sending results to a reducer, similarly
400   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
401   */
402  def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
403    fromRDD(rdd.reduceByKey(func, numPartitions))
404
405  /**
406   * Group the values for each key in the RDD into a single sequence. Allows controlling the
407   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
408   *
409   * @note If you are grouping in order to perform an aggregation (such as a sum or average) over
410   * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
411   * will provide much better performance.
412   */
413  def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
414    fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
415
416  /**
417   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
418   * resulting RDD with into `numPartitions` partitions.
419   *
420   * @note If you are grouping in order to perform an aggregation (such as a sum or average) over
421   * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
422   * will provide much better performance.
423   */
424  def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
425    fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
426
427  /**
428   * Return an RDD with the elements from `this` that are not in `other`.
429   *
430   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
431   * RDD will be <= us.
432   */
433  def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
434    fromRDD(rdd.subtract(other))
435
436  /**
437   * Return an RDD with the elements from `this` that are not in `other`.
438   */
439  def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
440    fromRDD(rdd.subtract(other, numPartitions))
441
442  /**
443   * Return an RDD with the elements from `this` that are not in `other`.
444   */
445  def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
446    fromRDD(rdd.subtract(other, p))
447
448  /**
449   * Return an RDD with the pairs from `this` whose keys are not in `other`.
450   *
451   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
452   * RDD will be <= us.
453   */
454  def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = {
455    implicit val ctag: ClassTag[W] = fakeClassTag
456    fromRDD(rdd.subtractByKey(other))
457  }
458
459  /**
460   * Return an RDD with the pairs from `this` whose keys are not in `other`.
461   */
462  def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = {
463    implicit val ctag: ClassTag[W] = fakeClassTag
464    fromRDD(rdd.subtractByKey(other, numPartitions))
465  }
466
467  /**
468   * Return an RDD with the pairs from `this` whose keys are not in `other`.
469   */
470  def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = {
471    implicit val ctag: ClassTag[W] = fakeClassTag
472    fromRDD(rdd.subtractByKey(other, p))
473  }
474
475  /**
476   * Return a copy of the RDD partitioned using the specified partitioner.
477   */
478  def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
479    fromRDD(rdd.partitionBy(partitioner))
480
481  /**
482   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
483   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
484   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
485   */
486  def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
487    fromRDD(rdd.join(other, partitioner))
488
489  /**
490   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
491   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
492   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
493   * partition the output RDD.
494   */
495  def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
496  : JavaPairRDD[K, (V, Optional[W])] = {
497    val joinResult = rdd.leftOuterJoin(other, partitioner)
498    fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
499  }
500
501  /**
502   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
503   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
504   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
505   * partition the output RDD.
506   */
507  def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
508  : JavaPairRDD[K, (Optional[V], W)] = {
509    val joinResult = rdd.rightOuterJoin(other, partitioner)
510    fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
511  }
512
513  /**
514   * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
515   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
516   * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
517   * element (k, w) in `other`, the resulting RDD will either contain all pairs
518   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
519   * in `this` have key k. Uses the given Partitioner to partition the output RDD.
520   */
521  def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
522  : JavaPairRDD[K, (Optional[V], Optional[W])] = {
523    val joinResult = rdd.fullOuterJoin(other, partitioner)
524    fromRDD(joinResult.mapValues{ case (v, w) =>
525      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
526    })
527  }
528
529  /**
530   * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
531   * partitioner/parallelism level and using map-side aggregation.
532   */
533  def combineByKey[C](createCombiner: JFunction[V, C],
534    mergeValue: JFunction2[C, V, C],
535    mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
536    implicit val ctag: ClassTag[C] = fakeClassTag
537    fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
538  }
539
540  /**
541   * Merge the values for each key using an associative and commutative reduce function. This will
542   * also perform the merging locally on each mapper before sending results to a reducer, similarly
543   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
544   * parallelism level.
545   */
546  def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
547    fromRDD(reduceByKey(defaultPartitioner(rdd), func))
548  }
549
550  /**
551   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
552   * resulting RDD with the existing partitioner/parallelism level.
553   *
554   * @note If you are grouping in order to perform an aggregation (such as a sum or average) over
555   * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
556   * will provide much better performance.
557   */
558  def groupByKey(): JavaPairRDD[K, JIterable[V]] =
559    fromRDD(groupByResultToJava(rdd.groupByKey()))
560
561  /**
562   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
563   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
564   * (k, v2) is in `other`. Performs a hash join across the cluster.
565   */
566  def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
567    fromRDD(rdd.join(other))
568
569  /**
570   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
571   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
572   * (k, v2) is in `other`. Performs a hash join across the cluster.
573   */
574  def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
575    fromRDD(rdd.join(other, numPartitions))
576
577  /**
578   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
579   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
580   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
581   * using the existing partitioner/parallelism level.
582   */
583  def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
584    val joinResult = rdd.leftOuterJoin(other)
585    fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
586  }
587
588  /**
589   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
590   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
591   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
592   * into `numPartitions` partitions.
593   */
594  def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
595  : JavaPairRDD[K, (V, Optional[W])] = {
596    val joinResult = rdd.leftOuterJoin(other, numPartitions)
597    fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
598  }
599
600  /**
601   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
602   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
603   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
604   * RDD using the existing partitioner/parallelism level.
605   */
606  def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
607    val joinResult = rdd.rightOuterJoin(other)
608    fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
609  }
610
611  /**
612   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
613   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
614   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
615   * RDD into the given number of partitions.
616   */
617  def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
618  : JavaPairRDD[K, (Optional[V], W)] = {
619    val joinResult = rdd.rightOuterJoin(other, numPartitions)
620    fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
621  }
622
623  /**
624   * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
625   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
626   * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
627   * element (k, w) in `other`, the resulting RDD will either contain all pairs
628   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
629   * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
630   * parallelism level.
631   */
632  def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
633    val joinResult = rdd.fullOuterJoin(other)
634    fromRDD(joinResult.mapValues{ case (v, w) =>
635      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
636    })
637  }
638
639  /**
640   * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
641   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
642   * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
643   * element (k, w) in `other`, the resulting RDD will either contain all pairs
644   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
645   * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
646   */
647  def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
648  : JavaPairRDD[K, (Optional[V], Optional[W])] = {
649    val joinResult = rdd.fullOuterJoin(other, numPartitions)
650    fromRDD(joinResult.mapValues{ case (v, w) =>
651      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
652    })
653  }
654
655  /**
656   * Return the key-value pairs in this RDD to the master as a Map.
657   *
658   * @note this method should only be used if the resulting data is expected to be small, as
659   * all the data is loaded into the driver's memory.
660   */
661  def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap())
662
663
664  /**
665   * Pass each value in the key-value pair RDD through a map function without changing the keys;
666   * this also retains the original RDD's partitioning.
667   */
668  def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
669    implicit val ctag: ClassTag[U] = fakeClassTag
670    fromRDD(rdd.mapValues(f))
671  }
672
673  /**
674   * Pass each value in the key-value pair RDD through a flatMap function without changing the
675   * keys; this also retains the original RDD's partitioning.
676   */
677  def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
678    def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
679    implicit val ctag: ClassTag[U] = fakeClassTag
680    fromRDD(rdd.flatMapValues(fn))
681  }
682
683  /**
684   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
685   * list of values for that key in `this` as well as `other`.
686   */
687  def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
688  : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
689    fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
690
691  /**
692   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
693   * tuple with the list of values for that key in `this`, `other1` and `other2`.
694   */
695  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
696      partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
697    fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
698
699  /**
700   * For each key k in `this` or `other1` or `other2` or `other3`,
701   * return a resulting RDD that contains a tuple with the list of values
702   * for that key in `this`, `other1`, `other2` and `other3`.
703   */
704  def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
705      other2: JavaPairRDD[K, W2],
706      other3: JavaPairRDD[K, W3],
707      partitioner: Partitioner)
708  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
709    fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))
710
711  /**
712   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
713   * list of values for that key in `this` as well as `other`.
714   */
715  def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
716    fromRDD(cogroupResultToJava(rdd.cogroup(other)))
717
718  /**
719   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
720   * tuple with the list of values for that key in `this`, `other1` and `other2`.
721   */
722  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
723  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
724    fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
725
726  /**
727   * For each key k in `this` or `other1` or `other2` or `other3`,
728   * return a resulting RDD that contains a tuple with the list of values
729   * for that key in `this`, `other1`, `other2` and `other3`.
730   */
731  def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
732      other2: JavaPairRDD[K, W2],
733      other3: JavaPairRDD[K, W3])
734  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
735    fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))
736
737  /**
738   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
739   * list of values for that key in `this` as well as `other`.
740   */
741  def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
742  : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
743    fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
744
745  /**
746   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
747   * tuple with the list of values for that key in `this`, `other1` and `other2`.
748   */
749  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
750  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
751    fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
752
753  /**
754   * For each key k in `this` or `other1` or `other2` or `other3`,
755   * return a resulting RDD that contains a tuple with the list of values
756   * for that key in `this`, `other1`, `other2` and `other3`.
757   */
758  def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
759      other2: JavaPairRDD[K, W2],
760      other3: JavaPairRDD[K, W3],
761      numPartitions: Int)
762  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
763    fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))
764
765  /** Alias for cogroup. */
766  def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
767    fromRDD(cogroupResultToJava(rdd.groupWith(other)))
768
769  /** Alias for cogroup. */
770  def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
771  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
772    fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
773
774  /** Alias for cogroup. */
775  def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1],
776      other2: JavaPairRDD[K, W2],
777      other3: JavaPairRDD[K, W3])
778  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
779    fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))
780
781  /**
782   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
783   * RDD has a known partitioner by only searching the partition that the key maps to.
784   */
785  def lookup(key: K): JList[V] = rdd.lookup(key).asJava
786
787  /** Output the RDD to any Hadoop-supported file system. */
788  def saveAsHadoopFile[F <: OutputFormat[_, _]](
789      path: String,
790      keyClass: Class[_],
791      valueClass: Class[_],
792      outputFormatClass: Class[F],
793      conf: JobConf) {
794    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
795  }
796
797  /** Output the RDD to any Hadoop-supported file system. */
798  def saveAsHadoopFile[F <: OutputFormat[_, _]](
799      path: String,
800      keyClass: Class[_],
801      valueClass: Class[_],
802      outputFormatClass: Class[F]) {
803    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
804  }
805
806  /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
807  def saveAsHadoopFile[F <: OutputFormat[_, _]](
808      path: String,
809      keyClass: Class[_],
810      valueClass: Class[_],
811      outputFormatClass: Class[F],
812      codec: Class[_ <: CompressionCodec]) {
813    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
814  }
815
816  /** Output the RDD to any Hadoop-supported file system. */
817  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
818      path: String,
819      keyClass: Class[_],
820      valueClass: Class[_],
821      outputFormatClass: Class[F],
822      conf: Configuration) {
823    rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
824  }
825
826  /**
827   * Output the RDD to any Hadoop-supported storage system, using
828   * a Configuration object for that storage system.
829   */
830  def saveAsNewAPIHadoopDataset(conf: Configuration) {
831    rdd.saveAsNewAPIHadoopDataset(conf)
832  }
833
834  /** Output the RDD to any Hadoop-supported file system. */
835  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
836      path: String,
837      keyClass: Class[_],
838      valueClass: Class[_],
839      outputFormatClass: Class[F]) {
840    rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
841  }
842
843  /**
844   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
845   * that storage system. The JobConf should set an OutputFormat and any output paths required
846   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
847   * MapReduce job.
848   */
849  def saveAsHadoopDataset(conf: JobConf) {
850    rdd.saveAsHadoopDataset(conf)
851  }
852
853  /**
854   * Repartition the RDD according to the given partitioner and, within each resulting partition,
855   * sort records by their keys.
856   *
857   * This is more efficient than calling `repartition` and then sorting within each partition
858   * because it can push the sorting down into the shuffle machinery.
859   */
860  def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V] = {
861    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
862    repartitionAndSortWithinPartitions(partitioner, comp)
863  }
864
865  /**
866   * Repartition the RDD according to the given partitioner and, within each resulting partition,
867   * sort records by their keys.
868   *
869   * This is more efficient than calling `repartition` and then sorting within each partition
870   * because it can push the sorting down into the shuffle machinery.
871   */
872  def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])
873    : JavaPairRDD[K, V] = {
874    implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
875    fromRDD(
876      new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner))
877  }
878
879  /**
880   * Sort the RDD by key, so that each partition contains a sorted range of the elements in
881   * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an
882   * ordered list of records (in the `save` case, they will be written to multiple `part-X` files
883   * in the filesystem, in order of the keys).
884   */
885  def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
886
887  /**
888   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
889   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
890   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
891   * order of the keys).
892   */
893  def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
894    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
895    sortByKey(comp, ascending)
896  }
897
898  /**
899   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
900   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
901   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
902   * order of the keys).
903   */
904  def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
905    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
906    sortByKey(comp, ascending, numPartitions)
907  }
908
909  /**
910   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
911   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
912   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
913   * order of the keys).
914   */
915  def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true)
916
917  /**
918   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
919   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
920   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
921   * order of the keys).
922   */
923  def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
924    implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
925    fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
926  }
927
928  /**
929   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
930   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
931   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
932   * order of the keys).
933   */
934  def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
935    implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
936    fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
937  }
938
939  /**
940   * Return an RDD with the keys of each tuple.
941   */
942  def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1))
943
944  /**
945   * Return an RDD with the values of each tuple.
946   */
947  def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
948
949  /**
950   * Return approximate number of distinct values for each key in this RDD.
951   *
952   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
953   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
954   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
955   *
956   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
957   *                   It must be greater than 0.000017.
958   * @param partitioner partitioner of the resulting RDD.
959   */
960  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner)
961  : JavaPairRDD[K, jl.Long] = {
962    fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)).
963      asInstanceOf[JavaPairRDD[K, jl.Long]]
964  }
965
966  /**
967   * Return approximate number of distinct values for each key in this RDD.
968   *
969   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
970   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
971   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
972   *
973   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
974   *                   It must be greater than 0.000017.
975   * @param numPartitions number of partitions of the resulting RDD.
976   */
977  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = {
978    fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)).
979      asInstanceOf[JavaPairRDD[K, jl.Long]]
980  }
981
982  /**
983   * Return approximate number of distinct values for each key in this RDD.
984   *
985   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
986   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
987   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
988   *
989   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
990   *                   It must be greater than 0.000017.
991   */
992  def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = {
993    fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]]
994  }
995
996  /** Assign a name to this RDD */
997  def setName(name: String): JavaPairRDD[K, V] = {
998    rdd.setName(name)
999    this
1000  }
1001}
1002
1003object JavaPairRDD {
1004  private[spark]
1005  def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
1006    rddToPairRDDFunctions(rdd).mapValues(_.asJava)
1007  }
1008
1009  private[spark]
1010  def cogroupResultToJava[K: ClassTag, V, W](
1011      rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
1012    rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava))
1013  }
1014
1015  private[spark]
1016  def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
1017      rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
1018      : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
1019    rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava))
1020  }
1021
1022  private[spark]
1023  def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
1024      rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
1025  : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
1026    rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava, x._4.asJava))
1027  }
1028
1029  def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
1030    new JavaPairRDD[K, V](rdd)
1031  }
1032
1033  implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
1034
1035  private[spark]
1036  implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
1037    (x: T1, x1: T2) => fun.call(x, x1)
1038  }
1039
1040  private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)
1041
1042  private[spark]
1043  implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)
1044
1045  /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
1046  def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
1047    implicit val ctagK: ClassTag[K] = fakeClassTag
1048    implicit val ctagV: ClassTag[V] = fakeClassTag
1049    new JavaPairRDD[K, V](rdd.rdd)
1050  }
1051
1052}
1053