1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.api.java 19 20import java.{lang => jl} 21import java.lang.{Iterable => JIterable} 22import java.util.{Comparator, List => JList} 23 24import scala.collection.JavaConverters._ 25import scala.language.implicitConversions 26import scala.reflect.ClassTag 27 28import org.apache.hadoop.conf.Configuration 29import org.apache.hadoop.io.compress.CompressionCodec 30import org.apache.hadoop.mapred.{JobConf, OutputFormat} 31import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} 32 33import org.apache.spark.{HashPartitioner, Partitioner} 34import org.apache.spark.Partitioner._ 35import org.apache.spark.api.java.JavaSparkContext.fakeClassTag 36import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap 37import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} 38import org.apache.spark.partial.{BoundedDouble, PartialResult} 39import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} 40import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 41import org.apache.spark.serializer.Serializer 42import org.apache.spark.storage.StorageLevel 43import org.apache.spark.util.Utils 44 45class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) 46 (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]) 47 extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] { 48 49 override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) 50 51 override val classTag: ClassTag[(K, V)] = rdd.elementClassTag 52 53 import JavaPairRDD._ 54 55 // Common RDD functions 56 57 /** 58 * Persist this RDD with the default storage level (`MEMORY_ONLY`). 59 */ 60 def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache()) 61 62 /** 63 * Set this RDD's storage level to persist its values across operations after the first time 64 * it is computed. Can only be called once on each RDD. 65 */ 66 def persist(newLevel: StorageLevel): JavaPairRDD[K, V] = 67 new JavaPairRDD[K, V](rdd.persist(newLevel)) 68 69 /** 70 * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 71 * This method blocks until all blocks are deleted. 72 */ 73 def unpersist(): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist()) 74 75 /** 76 * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 77 * 78 * @param blocking Whether to block until all blocks are deleted. 79 */ 80 def unpersist(blocking: Boolean): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist(blocking)) 81 82 // Transformations (return a new RDD) 83 84 /** 85 * Return a new RDD containing the distinct elements in this RDD. 86 */ 87 def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct()) 88 89 /** 90 * Return a new RDD containing the distinct elements in this RDD. 91 */ 92 def distinct(numPartitions: Int): JavaPairRDD[K, V] = 93 new JavaPairRDD[K, V](rdd.distinct(numPartitions)) 94 95 /** 96 * Return a new RDD containing only the elements that satisfy a predicate. 97 */ 98 def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = 99 new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue())) 100 101 /** 102 * Return a new RDD that is reduced into `numPartitions` partitions. 103 */ 104 def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions)) 105 106 /** 107 * Return a new RDD that is reduced into `numPartitions` partitions. 108 */ 109 def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = 110 fromRDD(rdd.coalesce(numPartitions, shuffle)) 111 112 /** 113 * Return a new RDD that has exactly numPartitions partitions. 114 * 115 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 116 * a shuffle to redistribute data. 117 * 118 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 119 * which can avoid performing a shuffle. 120 */ 121 def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions)) 122 123 /** 124 * Return a sampled subset of this RDD. 125 */ 126 def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] = 127 sample(withReplacement, fraction, Utils.random.nextLong) 128 129 /** 130 * Return a sampled subset of this RDD. 131 */ 132 def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] = 133 new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed)) 134 135 /** 136 * Return a subset of this RDD sampled by key (via stratified sampling). 137 * 138 * Create a sample of this RDD using variable sampling rates for different keys as specified by 139 * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the 140 * RDD, to produce a sample of size that's approximately equal to the sum of 141 * math.ceil(numItems * samplingRate) over all key values. 142 */ 143 def sampleByKey(withReplacement: Boolean, 144 fractions: java.util.Map[K, jl.Double], 145 seed: Long): JavaPairRDD[K, V] = 146 new JavaPairRDD[K, V](rdd.sampleByKey( 147 withReplacement, 148 fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize 149 seed)) 150 151 /** 152 * Return a subset of this RDD sampled by key (via stratified sampling). 153 * 154 * Create a sample of this RDD using variable sampling rates for different keys as specified by 155 * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the 156 * RDD, to produce a sample of size that's approximately equal to the sum of 157 * math.ceil(numItems * samplingRate) over all key values. 158 * 159 * Use Utils.random.nextLong as the default seed for the random number generator. 160 */ 161 def sampleByKey(withReplacement: Boolean, 162 fractions: java.util.Map[K, jl.Double]): JavaPairRDD[K, V] = 163 sampleByKey(withReplacement, fractions, Utils.random.nextLong) 164 165 /** 166 * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly 167 * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). 168 * 169 * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to 170 * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate) 171 * over all key values with a 99.99% confidence. When sampling without replacement, we need one 172 * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need 173 * two additional passes. 174 */ 175 def sampleByKeyExact(withReplacement: Boolean, 176 fractions: java.util.Map[K, jl.Double], 177 seed: Long): JavaPairRDD[K, V] = 178 new JavaPairRDD[K, V](rdd.sampleByKeyExact( 179 withReplacement, 180 fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize 181 seed)) 182 183 /** 184 * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly 185 * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). 186 * 187 * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to 188 * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate) 189 * over all key values with a 99.99% confidence. When sampling without replacement, we need one 190 * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need 191 * two additional passes. 192 * 193 * Use Utils.random.nextLong as the default seed for the random number generator. 194 */ 195 def sampleByKeyExact( 196 withReplacement: Boolean, 197 fractions: java.util.Map[K, jl.Double]): JavaPairRDD[K, V] = 198 sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong) 199 200 /** 201 * Return the union of this RDD and another one. Any identical elements will appear multiple 202 * times (use `.distinct()` to eliminate them). 203 */ 204 def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = 205 new JavaPairRDD[K, V](rdd.union(other.rdd)) 206 207 /** 208 * Return the intersection of this RDD and another one. The output will not contain any duplicate 209 * elements, even if the input RDDs did. 210 * 211 * @note This method performs a shuffle internally. 212 */ 213 def intersection(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = 214 new JavaPairRDD[K, V](rdd.intersection(other.rdd)) 215 216 217 // first() has to be overridden here so that the generated method has the signature 218 // 'public scala.Tuple2 first()'; if the trait's definition is used, 219 // then the method has the signature 'public java.lang.Object first()', 220 // causing NoSuchMethodErrors at runtime. 221 override def first(): (K, V) = rdd.first() 222 223 // Pair RDD functions 224 225 /** 226 * Generic function to combine the elements for each key using a custom set of aggregation 227 * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a 228 * "combined type" C. 229 * 230 * Users provide three functions: 231 * 232 * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) 233 * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) 234 * - `mergeCombiners`, to combine two C's into a single one. 235 * 236 * In addition, users can control the partitioning of the output RDD, the serializer that is use 237 * for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple 238 * items with the same key). 239 * 240 * @note V and C can be different -- for example, one might group an RDD of type (Int, Int) into 241 * an RDD of type (Int, List[Int]). 242 */ 243 def combineByKey[C](createCombiner: JFunction[V, C], 244 mergeValue: JFunction2[C, V, C], 245 mergeCombiners: JFunction2[C, C, C], 246 partitioner: Partitioner, 247 mapSideCombine: Boolean, 248 serializer: Serializer): JavaPairRDD[K, C] = { 249 implicit val ctag: ClassTag[C] = fakeClassTag 250 fromRDD(rdd.combineByKeyWithClassTag( 251 createCombiner, 252 mergeValue, 253 mergeCombiners, 254 partitioner, 255 mapSideCombine, 256 serializer 257 )) 258 } 259 260 /** 261 * Generic function to combine the elements for each key using a custom set of aggregation 262 * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a 263 * "combined type" C. 264 * 265 * Users provide three functions: 266 * 267 * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) 268 * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) 269 * - `mergeCombiners`, to combine two C's into a single one. 270 * 271 * In addition, users can control the partitioning of the output RDD. This method automatically 272 * uses map-side aggregation in shuffling the RDD. 273 * 274 * @note V and C can be different -- for example, one might group an RDD of type (Int, Int) into 275 * an RDD of type (Int, List[Int]). 276 */ 277 def combineByKey[C](createCombiner: JFunction[V, C], 278 mergeValue: JFunction2[C, V, C], 279 mergeCombiners: JFunction2[C, C, C], 280 partitioner: Partitioner): JavaPairRDD[K, C] = { 281 combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, true, null) 282 } 283 284 /** 285 * Simplified version of combineByKey that hash-partitions the output RDD and uses map-side 286 * aggregation. 287 */ 288 def combineByKey[C](createCombiner: JFunction[V, C], 289 mergeValue: JFunction2[C, V, C], 290 mergeCombiners: JFunction2[C, C, C], 291 numPartitions: Int): JavaPairRDD[K, C] = 292 combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) 293 294 /** 295 * Merge the values for each key using an associative and commutative reduce function. This will 296 * also perform the merging locally on each mapper before sending results to a reducer, similarly 297 * to a "combiner" in MapReduce. 298 */ 299 def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = 300 fromRDD(rdd.reduceByKey(partitioner, func)) 301 302 /** 303 * Merge the values for each key using an associative and commutative reduce function, but return 304 * the result immediately to the master as a Map. This will also perform the merging locally on 305 * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. 306 */ 307 def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] = 308 mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func)) 309 310 /** Count the number of elements for each key, and return the result to the master as a Map. */ 311 def countByKey(): java.util.Map[K, jl.Long] = 312 mapAsSerializableJavaMap(rdd.countByKey()).asInstanceOf[java.util.Map[K, jl.Long]] 313 314 /** 315 * Approximate version of countByKey that can return a partial result if it does 316 * not finish within a timeout. 317 */ 318 def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] = 319 rdd.countByKeyApprox(timeout).map(mapAsSerializableJavaMap) 320 321 /** 322 * Approximate version of countByKey that can return a partial result if it does 323 * not finish within a timeout. 324 */ 325 def countByKeyApprox(timeout: Long, confidence: Double = 0.95) 326 : PartialResult[java.util.Map[K, BoundedDouble]] = 327 rdd.countByKeyApprox(timeout, confidence).map(mapAsSerializableJavaMap) 328 329 /** 330 * Aggregate the values of each key, using given combine functions and a neutral "zero value". 331 * This function can return a different result type, U, than the type of the values in this RDD, 332 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, 333 * as in scala.TraversableOnce. The former operation is used for merging values within a 334 * partition, and the latter is used for merging values between partitions. To avoid memory 335 * allocation, both of these functions are allowed to modify and return their first argument 336 * instead of creating a new U. 337 */ 338 def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U], 339 combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { 340 implicit val ctag: ClassTag[U] = fakeClassTag 341 fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc)) 342 } 343 344 /** 345 * Aggregate the values of each key, using given combine functions and a neutral "zero value". 346 * This function can return a different result type, U, than the type of the values in this RDD, 347 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, 348 * as in scala.TraversableOnce. The former operation is used for merging values within a 349 * partition, and the latter is used for merging values between partitions. To avoid memory 350 * allocation, both of these functions are allowed to modify and return their first argument 351 * instead of creating a new U. 352 */ 353 def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U], 354 combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { 355 implicit val ctag: ClassTag[U] = fakeClassTag 356 fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc)) 357 } 358 359 /** 360 * Aggregate the values of each key, using given combine functions and a neutral "zero value". 361 * This function can return a different result type, U, than the type of the values in this RDD, 362 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's. 363 * The former operation is used for merging values within a partition, and the latter is used for 364 * merging values between partitions. To avoid memory allocation, both of these functions are 365 * allowed to modify and return their first argument instead of creating a new U. 366 */ 367 def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): 368 JavaPairRDD[K, U] = { 369 implicit val ctag: ClassTag[U] = fakeClassTag 370 fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc)) 371 } 372 373 /** 374 * Merge the values for each key using an associative function and a neutral "zero value" which 375 * may be added to the result an arbitrary number of times, and must not change the result 376 * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.). 377 */ 378 def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]) 379 : JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func)) 380 381 /** 382 * Merge the values for each key using an associative function and a neutral "zero value" which 383 * may be added to the result an arbitrary number of times, and must not change the result 384 * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.). 385 */ 386 def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = 387 fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func)) 388 389 /** 390 * Merge the values for each key using an associative function and a neutral "zero value" 391 * which may be added to the result an arbitrary number of times, and must not change the result 392 * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). 393 */ 394 def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = 395 fromRDD(rdd.foldByKey(zeroValue)(func)) 396 397 /** 398 * Merge the values for each key using an associative and commutative reduce function. This will 399 * also perform the merging locally on each mapper before sending results to a reducer, similarly 400 * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. 401 */ 402 def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] = 403 fromRDD(rdd.reduceByKey(func, numPartitions)) 404 405 /** 406 * Group the values for each key in the RDD into a single sequence. Allows controlling the 407 * partitioning of the resulting key-value pair RDD by passing a Partitioner. 408 * 409 * @note If you are grouping in order to perform an aggregation (such as a sum or average) over 410 * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey` 411 * will provide much better performance. 412 */ 413 def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] = 414 fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) 415 416 /** 417 * Group the values for each key in the RDD into a single sequence. Hash-partitions the 418 * resulting RDD with into `numPartitions` partitions. 419 * 420 * @note If you are grouping in order to perform an aggregation (such as a sum or average) over 421 * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey` 422 * will provide much better performance. 423 */ 424 def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] = 425 fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) 426 427 /** 428 * Return an RDD with the elements from `this` that are not in `other`. 429 * 430 * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting 431 * RDD will be <= us. 432 */ 433 def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = 434 fromRDD(rdd.subtract(other)) 435 436 /** 437 * Return an RDD with the elements from `this` that are not in `other`. 438 */ 439 def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] = 440 fromRDD(rdd.subtract(other, numPartitions)) 441 442 /** 443 * Return an RDD with the elements from `this` that are not in `other`. 444 */ 445 def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] = 446 fromRDD(rdd.subtract(other, p)) 447 448 /** 449 * Return an RDD with the pairs from `this` whose keys are not in `other`. 450 * 451 * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting 452 * RDD will be <= us. 453 */ 454 def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = { 455 implicit val ctag: ClassTag[W] = fakeClassTag 456 fromRDD(rdd.subtractByKey(other)) 457 } 458 459 /** 460 * Return an RDD with the pairs from `this` whose keys are not in `other`. 461 */ 462 def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = { 463 implicit val ctag: ClassTag[W] = fakeClassTag 464 fromRDD(rdd.subtractByKey(other, numPartitions)) 465 } 466 467 /** 468 * Return an RDD with the pairs from `this` whose keys are not in `other`. 469 */ 470 def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = { 471 implicit val ctag: ClassTag[W] = fakeClassTag 472 fromRDD(rdd.subtractByKey(other, p)) 473 } 474 475 /** 476 * Return a copy of the RDD partitioned using the specified partitioner. 477 */ 478 def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] = 479 fromRDD(rdd.partitionBy(partitioner)) 480 481 /** 482 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each 483 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and 484 * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. 485 */ 486 def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = 487 fromRDD(rdd.join(other, partitioner)) 488 489 /** 490 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the 491 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the 492 * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to 493 * partition the output RDD. 494 */ 495 def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) 496 : JavaPairRDD[K, (V, Optional[W])] = { 497 val joinResult = rdd.leftOuterJoin(other, partitioner) 498 fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) 499 } 500 501 /** 502 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the 503 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the 504 * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to 505 * partition the output RDD. 506 */ 507 def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) 508 : JavaPairRDD[K, (Optional[V], W)] = { 509 val joinResult = rdd.rightOuterJoin(other, partitioner) 510 fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) 511 } 512 513 /** 514 * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the 515 * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or 516 * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each 517 * element (k, w) in `other`, the resulting RDD will either contain all pairs 518 * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements 519 * in `this` have key k. Uses the given Partitioner to partition the output RDD. 520 */ 521 def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) 522 : JavaPairRDD[K, (Optional[V], Optional[W])] = { 523 val joinResult = rdd.fullOuterJoin(other, partitioner) 524 fromRDD(joinResult.mapValues{ case (v, w) => 525 (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) 526 }) 527 } 528 529 /** 530 * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing 531 * partitioner/parallelism level and using map-side aggregation. 532 */ 533 def combineByKey[C](createCombiner: JFunction[V, C], 534 mergeValue: JFunction2[C, V, C], 535 mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = { 536 implicit val ctag: ClassTag[C] = fakeClassTag 537 fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd))) 538 } 539 540 /** 541 * Merge the values for each key using an associative and commutative reduce function. This will 542 * also perform the merging locally on each mapper before sending results to a reducer, similarly 543 * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ 544 * parallelism level. 545 */ 546 def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { 547 fromRDD(reduceByKey(defaultPartitioner(rdd), func)) 548 } 549 550 /** 551 * Group the values for each key in the RDD into a single sequence. Hash-partitions the 552 * resulting RDD with the existing partitioner/parallelism level. 553 * 554 * @note If you are grouping in order to perform an aggregation (such as a sum or average) over 555 * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey` 556 * will provide much better performance. 557 */ 558 def groupByKey(): JavaPairRDD[K, JIterable[V]] = 559 fromRDD(groupByResultToJava(rdd.groupByKey())) 560 561 /** 562 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each 563 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and 564 * (k, v2) is in `other`. Performs a hash join across the cluster. 565 */ 566 def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = 567 fromRDD(rdd.join(other)) 568 569 /** 570 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each 571 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and 572 * (k, v2) is in `other`. Performs a hash join across the cluster. 573 */ 574 def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = 575 fromRDD(rdd.join(other, numPartitions)) 576 577 /** 578 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the 579 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the 580 * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output 581 * using the existing partitioner/parallelism level. 582 */ 583 def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { 584 val joinResult = rdd.leftOuterJoin(other) 585 fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) 586 } 587 588 /** 589 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the 590 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the 591 * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output 592 * into `numPartitions` partitions. 593 */ 594 def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) 595 : JavaPairRDD[K, (V, Optional[W])] = { 596 val joinResult = rdd.leftOuterJoin(other, numPartitions) 597 fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) 598 } 599 600 /** 601 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the 602 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the 603 * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting 604 * RDD using the existing partitioner/parallelism level. 605 */ 606 def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { 607 val joinResult = rdd.rightOuterJoin(other) 608 fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) 609 } 610 611 /** 612 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the 613 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the 614 * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting 615 * RDD into the given number of partitions. 616 */ 617 def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) 618 : JavaPairRDD[K, (Optional[V], W)] = { 619 val joinResult = rdd.rightOuterJoin(other, numPartitions) 620 fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) 621 } 622 623 /** 624 * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the 625 * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or 626 * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each 627 * element (k, w) in `other`, the resulting RDD will either contain all pairs 628 * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements 629 * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ 630 * parallelism level. 631 */ 632 def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { 633 val joinResult = rdd.fullOuterJoin(other) 634 fromRDD(joinResult.mapValues{ case (v, w) => 635 (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) 636 }) 637 } 638 639 /** 640 * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the 641 * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or 642 * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each 643 * element (k, w) in `other`, the resulting RDD will either contain all pairs 644 * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements 645 * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. 646 */ 647 def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) 648 : JavaPairRDD[K, (Optional[V], Optional[W])] = { 649 val joinResult = rdd.fullOuterJoin(other, numPartitions) 650 fromRDD(joinResult.mapValues{ case (v, w) => 651 (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) 652 }) 653 } 654 655 /** 656 * Return the key-value pairs in this RDD to the master as a Map. 657 * 658 * @note this method should only be used if the resulting data is expected to be small, as 659 * all the data is loaded into the driver's memory. 660 */ 661 def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap()) 662 663 664 /** 665 * Pass each value in the key-value pair RDD through a map function without changing the keys; 666 * this also retains the original RDD's partitioning. 667 */ 668 def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = { 669 implicit val ctag: ClassTag[U] = fakeClassTag 670 fromRDD(rdd.mapValues(f)) 671 } 672 673 /** 674 * Pass each value in the key-value pair RDD through a flatMap function without changing the 675 * keys; this also retains the original RDD's partitioning. 676 */ 677 def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = { 678 def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala 679 implicit val ctag: ClassTag[U] = fakeClassTag 680 fromRDD(rdd.flatMapValues(fn)) 681 } 682 683 /** 684 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the 685 * list of values for that key in `this` as well as `other`. 686 */ 687 def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) 688 : JavaPairRDD[K, (JIterable[V], JIterable[W])] = 689 fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) 690 691 /** 692 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a 693 * tuple with the list of values for that key in `this`, `other1` and `other2`. 694 */ 695 def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], 696 partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = 697 fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) 698 699 /** 700 * For each key k in `this` or `other1` or `other2` or `other3`, 701 * return a resulting RDD that contains a tuple with the list of values 702 * for that key in `this`, `other1`, `other2` and `other3`. 703 */ 704 def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], 705 other2: JavaPairRDD[K, W2], 706 other3: JavaPairRDD[K, W3], 707 partitioner: Partitioner) 708 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] = 709 fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner))) 710 711 /** 712 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the 713 * list of values for that key in `this` as well as `other`. 714 */ 715 def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = 716 fromRDD(cogroupResultToJava(rdd.cogroup(other))) 717 718 /** 719 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a 720 * tuple with the list of values for that key in `this`, `other1` and `other2`. 721 */ 722 def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) 723 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = 724 fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) 725 726 /** 727 * For each key k in `this` or `other1` or `other2` or `other3`, 728 * return a resulting RDD that contains a tuple with the list of values 729 * for that key in `this`, `other1`, `other2` and `other3`. 730 */ 731 def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], 732 other2: JavaPairRDD[K, W2], 733 other3: JavaPairRDD[K, W3]) 734 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] = 735 fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3))) 736 737 /** 738 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the 739 * list of values for that key in `this` as well as `other`. 740 */ 741 def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int) 742 : JavaPairRDD[K, (JIterable[V], JIterable[W])] = 743 fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) 744 745 /** 746 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a 747 * tuple with the list of values for that key in `this`, `other1` and `other2`. 748 */ 749 def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) 750 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = 751 fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) 752 753 /** 754 * For each key k in `this` or `other1` or `other2` or `other3`, 755 * return a resulting RDD that contains a tuple with the list of values 756 * for that key in `this`, `other1`, `other2` and `other3`. 757 */ 758 def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], 759 other2: JavaPairRDD[K, W2], 760 other3: JavaPairRDD[K, W3], 761 numPartitions: Int) 762 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] = 763 fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions))) 764 765 /** Alias for cogroup. */ 766 def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = 767 fromRDD(cogroupResultToJava(rdd.groupWith(other))) 768 769 /** Alias for cogroup. */ 770 def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) 771 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = 772 fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) 773 774 /** Alias for cogroup. */ 775 def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1], 776 other2: JavaPairRDD[K, W2], 777 other3: JavaPairRDD[K, W3]) 778 : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] = 779 fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3))) 780 781 /** 782 * Return the list of values in the RDD for key `key`. This operation is done efficiently if the 783 * RDD has a known partitioner by only searching the partition that the key maps to. 784 */ 785 def lookup(key: K): JList[V] = rdd.lookup(key).asJava 786 787 /** Output the RDD to any Hadoop-supported file system. */ 788 def saveAsHadoopFile[F <: OutputFormat[_, _]]( 789 path: String, 790 keyClass: Class[_], 791 valueClass: Class[_], 792 outputFormatClass: Class[F], 793 conf: JobConf) { 794 rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) 795 } 796 797 /** Output the RDD to any Hadoop-supported file system. */ 798 def saveAsHadoopFile[F <: OutputFormat[_, _]]( 799 path: String, 800 keyClass: Class[_], 801 valueClass: Class[_], 802 outputFormatClass: Class[F]) { 803 rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass) 804 } 805 806 /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */ 807 def saveAsHadoopFile[F <: OutputFormat[_, _]]( 808 path: String, 809 keyClass: Class[_], 810 valueClass: Class[_], 811 outputFormatClass: Class[F], 812 codec: Class[_ <: CompressionCodec]) { 813 rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec) 814 } 815 816 /** Output the RDD to any Hadoop-supported file system. */ 817 def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( 818 path: String, 819 keyClass: Class[_], 820 valueClass: Class[_], 821 outputFormatClass: Class[F], 822 conf: Configuration) { 823 rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) 824 } 825 826 /** 827 * Output the RDD to any Hadoop-supported storage system, using 828 * a Configuration object for that storage system. 829 */ 830 def saveAsNewAPIHadoopDataset(conf: Configuration) { 831 rdd.saveAsNewAPIHadoopDataset(conf) 832 } 833 834 /** Output the RDD to any Hadoop-supported file system. */ 835 def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( 836 path: String, 837 keyClass: Class[_], 838 valueClass: Class[_], 839 outputFormatClass: Class[F]) { 840 rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass) 841 } 842 843 /** 844 * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for 845 * that storage system. The JobConf should set an OutputFormat and any output paths required 846 * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop 847 * MapReduce job. 848 */ 849 def saveAsHadoopDataset(conf: JobConf) { 850 rdd.saveAsHadoopDataset(conf) 851 } 852 853 /** 854 * Repartition the RDD according to the given partitioner and, within each resulting partition, 855 * sort records by their keys. 856 * 857 * This is more efficient than calling `repartition` and then sorting within each partition 858 * because it can push the sorting down into the shuffle machinery. 859 */ 860 def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V] = { 861 val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] 862 repartitionAndSortWithinPartitions(partitioner, comp) 863 } 864 865 /** 866 * Repartition the RDD according to the given partitioner and, within each resulting partition, 867 * sort records by their keys. 868 * 869 * This is more efficient than calling `repartition` and then sorting within each partition 870 * because it can push the sorting down into the shuffle machinery. 871 */ 872 def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) 873 : JavaPairRDD[K, V] = { 874 implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. 875 fromRDD( 876 new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner)) 877 } 878 879 /** 880 * Sort the RDD by key, so that each partition contains a sorted range of the elements in 881 * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an 882 * ordered list of records (in the `save` case, they will be written to multiple `part-X` files 883 * in the filesystem, in order of the keys). 884 */ 885 def sortByKey(): JavaPairRDD[K, V] = sortByKey(true) 886 887 /** 888 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling 889 * `collect` or `save` on the resulting RDD will return or output an ordered list of records 890 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in 891 * order of the keys). 892 */ 893 def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = { 894 val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] 895 sortByKey(comp, ascending) 896 } 897 898 /** 899 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling 900 * `collect` or `save` on the resulting RDD will return or output an ordered list of records 901 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in 902 * order of the keys). 903 */ 904 def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = { 905 val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] 906 sortByKey(comp, ascending, numPartitions) 907 } 908 909 /** 910 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling 911 * `collect` or `save` on the resulting RDD will return or output an ordered list of records 912 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in 913 * order of the keys). 914 */ 915 def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true) 916 917 /** 918 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling 919 * `collect` or `save` on the resulting RDD will return or output an ordered list of records 920 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in 921 * order of the keys). 922 */ 923 def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { 924 implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. 925 fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) 926 } 927 928 /** 929 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling 930 * `collect` or `save` on the resulting RDD will return or output an ordered list of records 931 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in 932 * order of the keys). 933 */ 934 def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = { 935 implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. 936 fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) 937 } 938 939 /** 940 * Return an RDD with the keys of each tuple. 941 */ 942 def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1)) 943 944 /** 945 * Return an RDD with the values of each tuple. 946 */ 947 def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) 948 949 /** 950 * Return approximate number of distinct values for each key in this RDD. 951 * 952 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 953 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 954 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 955 * 956 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 957 * It must be greater than 0.000017. 958 * @param partitioner partitioner of the resulting RDD. 959 */ 960 def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner) 961 : JavaPairRDD[K, jl.Long] = { 962 fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)). 963 asInstanceOf[JavaPairRDD[K, jl.Long]] 964 } 965 966 /** 967 * Return approximate number of distinct values for each key in this RDD. 968 * 969 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 970 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 971 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 972 * 973 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 974 * It must be greater than 0.000017. 975 * @param numPartitions number of partitions of the resulting RDD. 976 */ 977 def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = { 978 fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)). 979 asInstanceOf[JavaPairRDD[K, jl.Long]] 980 } 981 982 /** 983 * Return approximate number of distinct values for each key in this RDD. 984 * 985 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 986 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 987 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 988 * 989 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 990 * It must be greater than 0.000017. 991 */ 992 def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = { 993 fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]] 994 } 995 996 /** Assign a name to this RDD */ 997 def setName(name: String): JavaPairRDD[K, V] = { 998 rdd.setName(name) 999 this 1000 } 1001} 1002 1003object JavaPairRDD { 1004 private[spark] 1005 def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = { 1006 rddToPairRDDFunctions(rdd).mapValues(_.asJava) 1007 } 1008 1009 private[spark] 1010 def cogroupResultToJava[K: ClassTag, V, W]( 1011 rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = { 1012 rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava)) 1013 } 1014 1015 private[spark] 1016 def cogroupResult2ToJava[K: ClassTag, V, W1, W2]( 1017 rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]) 1018 : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = { 1019 rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava)) 1020 } 1021 1022 private[spark] 1023 def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3]( 1024 rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]) 1025 : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = { 1026 rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava, x._4.asJava)) 1027 } 1028 1029 def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = { 1030 new JavaPairRDD[K, V](rdd) 1031 } 1032 1033 implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd 1034 1035 private[spark] 1036 implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = { 1037 (x: T1, x1: T2) => fun.call(x, x1) 1038 } 1039 1040 private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x) 1041 1042 private[spark] 1043 implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y) 1044 1045 /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ 1046 def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { 1047 implicit val ctagK: ClassTag[K] = fakeClassTag 1048 implicit val ctagV: ClassTag[V] = fakeClassTag 1049 new JavaPairRDD[K, V](rdd.rdd) 1050 } 1051 1052} 1053