1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.api.java 19 20import java.{lang => jl} 21import java.lang.{Iterable => JIterable} 22import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap} 23 24import scala.collection.JavaConverters._ 25import scala.reflect.ClassTag 26 27import org.apache.hadoop.io.compress.CompressionCodec 28 29import org.apache.spark._ 30import org.apache.spark.annotation.Since 31import org.apache.spark.api.java.JavaPairRDD._ 32import org.apache.spark.api.java.JavaSparkContext.fakeClassTag 33import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap 34import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _} 35import org.apache.spark.partial.{BoundedDouble, PartialResult} 36import org.apache.spark.rdd.RDD 37import org.apache.spark.storage.StorageLevel 38import org.apache.spark.util.Utils 39 40/** 41 * As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations 42 * of JavaRDDLike should extend this dummy abstract class instead of directly inheriting 43 * from the trait. See SPARK-3266 for additional details. 44 */ 45private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]] 46 extends JavaRDDLike[T, This] 47 48/** 49 * Defines operations common to several Java RDD implementations. 50 * 51 * @note This trait is not intended to be implemented by user code. 52 */ 53trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { 54 def wrapRDD(rdd: RDD[T]): This 55 56 implicit val classTag: ClassTag[T] 57 58 def rdd: RDD[T] 59 60 /** Set of partitions in this RDD. */ 61 def partitions: JList[Partition] = rdd.partitions.toSeq.asJava 62 63 /** Return the number of partitions in this RDD. */ 64 @Since("1.6.0") 65 def getNumPartitions: Int = rdd.getNumPartitions 66 67 /** The partitioner of this RDD. */ 68 def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner) 69 70 /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */ 71 def context: SparkContext = rdd.context 72 73 /** A unique ID for this RDD (within its SparkContext). */ 74 def id: Int = rdd.id 75 76 /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ 77 def getStorageLevel: StorageLevel = rdd.getStorageLevel 78 79 /** 80 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. 81 * This should ''not'' be called by users directly, but is available for implementors of custom 82 * subclasses of RDD. 83 */ 84 def iterator(split: Partition, taskContext: TaskContext): JIterator[T] = 85 rdd.iterator(split, taskContext).asJava 86 87 // Transformations (return a new RDD) 88 89 /** 90 * Return a new RDD by applying a function to all elements of this RDD. 91 */ 92 def map[R](f: JFunction[T, R]): JavaRDD[R] = 93 new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag) 94 95 /** 96 * Return a new RDD by applying a function to each partition of this RDD, while tracking the index 97 * of the original partition. 98 */ 99 def mapPartitionsWithIndex[R]( 100 f: JFunction2[jl.Integer, JIterator[T], JIterator[R]], 101 preservesPartitioning: Boolean = false): JavaRDD[R] = 102 new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala, 103 preservesPartitioning)(fakeClassTag))(fakeClassTag) 104 105 /** 106 * Return a new RDD by applying a function to all elements of this RDD. 107 */ 108 def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = { 109 new JavaDoubleRDD(rdd.map(f.call(_).doubleValue())) 110 } 111 112 /** 113 * Return a new RDD by applying a function to all elements of this RDD. 114 */ 115 def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { 116 def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]] 117 new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2]) 118 } 119 120 /** 121 * Return a new RDD by first applying a function to all elements of this 122 * RDD, and then flattening the results. 123 */ 124 def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = { 125 def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala 126 JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) 127 } 128 129 /** 130 * Return a new RDD by first applying a function to all elements of this 131 * RDD, and then flattening the results. 132 */ 133 def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { 134 def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala 135 new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue())) 136 } 137 138 /** 139 * Return a new RDD by first applying a function to all elements of this 140 * RDD, and then flattening the results. 141 */ 142 def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { 143 def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala 144 def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]] 145 JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2]) 146 } 147 148 /** 149 * Return a new RDD by applying a function to each partition of this RDD. 150 */ 151 def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = { 152 def fn: (Iterator[T]) => Iterator[U] = { 153 (x: Iterator[T]) => f.call(x.asJava).asScala 154 } 155 JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) 156 } 157 158 /** 159 * Return a new RDD by applying a function to each partition of this RDD. 160 */ 161 def mapPartitions[U](f: FlatMapFunction[JIterator[T], U], 162 preservesPartitioning: Boolean): JavaRDD[U] = { 163 def fn: (Iterator[T]) => Iterator[U] = { 164 (x: Iterator[T]) => f.call(x.asJava).asScala 165 } 166 JavaRDD.fromRDD( 167 rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U]) 168 } 169 170 /** 171 * Return a new RDD by applying a function to each partition of this RDD. 172 */ 173 def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = { 174 def fn: (Iterator[T]) => Iterator[jl.Double] = { 175 (x: Iterator[T]) => f.call(x.asJava).asScala 176 } 177 new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue())) 178 } 179 180 /** 181 * Return a new RDD by applying a function to each partition of this RDD. 182 */ 183 def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]): 184 JavaPairRDD[K2, V2] = { 185 def fn: (Iterator[T]) => Iterator[(K2, V2)] = { 186 (x: Iterator[T]) => f.call(x.asJava).asScala 187 } 188 JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) 189 } 190 191 /** 192 * Return a new RDD by applying a function to each partition of this RDD. 193 */ 194 def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]], 195 preservesPartitioning: Boolean): JavaDoubleRDD = { 196 def fn: (Iterator[T]) => Iterator[jl.Double] = { 197 (x: Iterator[T]) => f.call(x.asJava).asScala 198 } 199 new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) 200 .map(_.doubleValue())) 201 } 202 203 /** 204 * Return a new RDD by applying a function to each partition of this RDD. 205 */ 206 def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2], 207 preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { 208 def fn: (Iterator[T]) => Iterator[(K2, V2)] = { 209 (x: Iterator[T]) => f.call(x.asJava).asScala 210 } 211 JavaPairRDD.fromRDD( 212 rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2]) 213 } 214 215 /** 216 * Applies a function f to each partition of this RDD. 217 */ 218 def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = { 219 rdd.foreachPartition(x => f.call(x.asJava)) 220 } 221 222 /** 223 * Return an RDD created by coalescing all elements within each partition into an array. 224 */ 225 def glom(): JavaRDD[JList[T]] = 226 new JavaRDD(rdd.glom().map(_.toSeq.asJava)) 227 228 /** 229 * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of 230 * elements (a, b) where a is in `this` and b is in `other`. 231 */ 232 def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = 233 JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classTag))(classTag, other.classTag) 234 235 /** 236 * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements 237 * mapping to that key. 238 */ 239 def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = { 240 // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459 241 implicit val ctagK: ClassTag[U] = fakeClassTag 242 implicit val ctagV: ClassTag[JList[T]] = fakeClassTag 243 JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) 244 } 245 246 /** 247 * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements 248 * mapping to that key. 249 */ 250 def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = { 251 // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459 252 implicit val ctagK: ClassTag[U] = fakeClassTag 253 implicit val ctagV: ClassTag[JList[T]] = fakeClassTag 254 JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U]))) 255 } 256 257 /** 258 * Return an RDD created by piping elements to a forked external process. 259 */ 260 def pipe(command: String): JavaRDD[String] = { 261 rdd.pipe(command) 262 } 263 264 /** 265 * Return an RDD created by piping elements to a forked external process. 266 */ 267 def pipe(command: JList[String]): JavaRDD[String] = { 268 rdd.pipe(command.asScala) 269 } 270 271 /** 272 * Return an RDD created by piping elements to a forked external process. 273 */ 274 def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = { 275 rdd.pipe(command.asScala, env.asScala) 276 } 277 278 /** 279 * Return an RDD created by piping elements to a forked external process. 280 */ 281 def pipe(command: JList[String], 282 env: JMap[String, String], 283 separateWorkingDir: Boolean, 284 bufferSize: Int): JavaRDD[String] = { 285 rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize) 286 } 287 288 /** 289 * Return an RDD created by piping elements to a forked external process. 290 */ 291 def pipe(command: JList[String], 292 env: JMap[String, String], 293 separateWorkingDir: Boolean, 294 bufferSize: Int, 295 encoding: String): JavaRDD[String] = { 296 rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding) 297 } 298 299 /** 300 * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, 301 * second element in each RDD, etc. Assumes that the two RDDs have the *same number of 302 * partitions* and the *same number of elements in each partition* (e.g. one was made through 303 * a map on the other). 304 */ 305 def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = { 306 JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag) 307 } 308 309 /** 310 * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by 311 * applying a function to the zipped partitions. Assumes that all the RDDs have the 312 * *same number of partitions*, but does *not* require them to have the same number 313 * of elements in each partition. 314 */ 315 def zipPartitions[U, V]( 316 other: JavaRDDLike[U, _], 317 f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = { 318 def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { 319 (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala 320 } 321 JavaRDD.fromRDD( 322 rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) 323 } 324 325 /** 326 * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, 327 * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method 328 * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. 329 */ 330 def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = { 331 JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]] 332 } 333 334 /** 335 * Zips this RDD with its element indices. The ordering is first based on the partition index 336 * and then the ordering of items within each partition. So the first item in the first 337 * partition gets index 0, and the last item in the last partition receives the largest index. 338 * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. 339 * This method needs to trigger a spark job when this RDD contains more than one partitions. 340 */ 341 def zipWithIndex(): JavaPairRDD[T, jl.Long] = { 342 JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]] 343 } 344 345 // Actions (launch a job to return a value to the user program) 346 347 /** 348 * Applies a function f to all elements of this RDD. 349 */ 350 def foreach(f: VoidFunction[T]) { 351 rdd.foreach(x => f.call(x)) 352 } 353 354 /** 355 * Return an array that contains all of the elements in this RDD. 356 * 357 * @note this method should only be used if the resulting array is expected to be small, as 358 * all the data is loaded into the driver's memory. 359 */ 360 def collect(): JList[T] = 361 rdd.collect().toSeq.asJava 362 363 /** 364 * Return an iterator that contains all of the elements in this RDD. 365 * 366 * The iterator will consume as much memory as the largest partition in this RDD. 367 */ 368 def toLocalIterator(): JIterator[T] = 369 asJavaIteratorConverter(rdd.toLocalIterator).asJava 370 371 /** 372 * Return an array that contains all of the elements in a specific partition of this RDD. 373 */ 374 def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = { 375 // This is useful for implementing `take` from other language frontends 376 // like Python where the data is serialized. 377 val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds) 378 res.map(_.toSeq.asJava) 379 } 380 381 /** 382 * Reduces the elements of this RDD using the specified commutative and associative binary 383 * operator. 384 */ 385 def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f) 386 387 /** 388 * Reduces the elements of this RDD in a multi-level tree pattern. 389 * 390 * @param depth suggested depth of the tree 391 * @see [[org.apache.spark.api.java.JavaRDDLike#reduce]] 392 */ 393 def treeReduce(f: JFunction2[T, T, T], depth: Int): T = rdd.treeReduce(f, depth) 394 395 /** 396 * [[org.apache.spark.api.java.JavaRDDLike#treeReduce]] with suggested depth 2. 397 */ 398 def treeReduce(f: JFunction2[T, T, T]): T = treeReduce(f, 2) 399 400 /** 401 * Aggregate the elements of each partition, and then the results for all the partitions, using a 402 * given associative function and a neutral "zero value". The function 403 * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object 404 * allocation; however, it should not modify t2. 405 * 406 * This behaves somewhat differently from fold operations implemented for non-distributed 407 * collections in functional languages like Scala. This fold operation may be applied to 408 * partitions individually, and then fold those results into the final result, rather than 409 * apply the fold to each element sequentially in some defined ordering. For functions 410 * that are not commutative, the result may differ from that of a fold applied to a 411 * non-distributed collection. 412 */ 413 def fold(zeroValue: T)(f: JFunction2[T, T, T]): T = 414 rdd.fold(zeroValue)(f) 415 416 /** 417 * Aggregate the elements of each partition, and then the results for all the partitions, using 418 * given combine functions and a neutral "zero value". This function can return a different result 419 * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U 420 * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are 421 * allowed to modify and return their first argument instead of creating a new U to avoid memory 422 * allocation. 423 */ 424 def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], 425 combOp: JFunction2[U, U, U]): U = 426 rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U]) 427 428 /** 429 * Aggregates the elements of this RDD in a multi-level tree pattern. 430 * 431 * @param depth suggested depth of the tree 432 * @see [[org.apache.spark.api.java.JavaRDDLike#aggregate]] 433 */ 434 def treeAggregate[U]( 435 zeroValue: U, 436 seqOp: JFunction2[U, T, U], 437 combOp: JFunction2[U, U, U], 438 depth: Int): U = { 439 rdd.treeAggregate(zeroValue)(seqOp, combOp, depth)(fakeClassTag[U]) 440 } 441 442 /** 443 * [[org.apache.spark.api.java.JavaRDDLike#treeAggregate]] with suggested depth 2. 444 */ 445 def treeAggregate[U]( 446 zeroValue: U, 447 seqOp: JFunction2[U, T, U], 448 combOp: JFunction2[U, U, U]): U = { 449 treeAggregate(zeroValue, seqOp, combOp, 2) 450 } 451 452 /** 453 * Return the number of elements in the RDD. 454 */ 455 def count(): Long = rdd.count() 456 457 /** 458 * Approximate version of count() that returns a potentially incomplete result 459 * within a timeout, even if not all tasks have finished. 460 * 461 * The confidence is the probability that the error bounds of the result will 462 * contain the true value. That is, if countApprox were called repeatedly 463 * with confidence 0.9, we would expect 90% of the results to contain the 464 * true count. The confidence must be in the range [0,1] or an exception will 465 * be thrown. 466 * 467 * @param timeout maximum time to wait for the job, in milliseconds 468 * @param confidence the desired statistical confidence in the result 469 * @return a potentially incomplete result, with error bounds 470 */ 471 def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = 472 rdd.countApprox(timeout, confidence) 473 474 /** 475 * Approximate version of count() that returns a potentially incomplete result 476 * within a timeout, even if not all tasks have finished. 477 * 478 * @param timeout maximum time to wait for the job, in milliseconds 479 */ 480 def countApprox(timeout: Long): PartialResult[BoundedDouble] = 481 rdd.countApprox(timeout) 482 483 /** 484 * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final 485 * combine step happens locally on the master, equivalent to running a single reduce task. 486 */ 487 def countByValue(): JMap[T, jl.Long] = 488 mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]] 489 490 /** 491 * Approximate version of countByValue(). 492 * 493 * The confidence is the probability that the error bounds of the result will 494 * contain the true value. That is, if countApprox were called repeatedly 495 * with confidence 0.9, we would expect 90% of the results to contain the 496 * true count. The confidence must be in the range [0,1] or an exception will 497 * be thrown. 498 * 499 * @param timeout maximum time to wait for the job, in milliseconds 500 * @param confidence the desired statistical confidence in the result 501 * @return a potentially incomplete result, with error bounds 502 */ 503 def countByValueApprox( 504 timeout: Long, 505 confidence: Double 506 ): PartialResult[JMap[T, BoundedDouble]] = 507 rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap) 508 509 /** 510 * Approximate version of countByValue(). 511 * 512 * @param timeout maximum time to wait for the job, in milliseconds 513 * @return a potentially incomplete result, with error bounds 514 */ 515 def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] = 516 rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap) 517 518 /** 519 * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so 520 * it will be slow if a lot of partitions are required. In that case, use collect() to get the 521 * whole RDD instead. 522 * 523 * @note this method should only be used if the resulting array is expected to be small, as 524 * all the data is loaded into the driver's memory. 525 */ 526 def take(num: Int): JList[T] = 527 rdd.take(num).toSeq.asJava 528 529 def takeSample(withReplacement: Boolean, num: Int): JList[T] = 530 takeSample(withReplacement, num, Utils.random.nextLong) 531 532 def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = 533 rdd.takeSample(withReplacement, num, seed).toSeq.asJava 534 535 /** 536 * Return the first element in this RDD. 537 */ 538 def first(): T = rdd.first() 539 540 /** 541 * @return true if and only if the RDD contains no elements at all. Note that an RDD 542 * may be empty even when it has at least 1 partition. 543 */ 544 def isEmpty(): Boolean = rdd.isEmpty() 545 546 /** 547 * Save this RDD as a text file, using string representations of elements. 548 */ 549 def saveAsTextFile(path: String): Unit = { 550 rdd.saveAsTextFile(path) 551 } 552 553 554 /** 555 * Save this RDD as a compressed text file, using string representations of elements. 556 */ 557 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = { 558 rdd.saveAsTextFile(path, codec) 559 } 560 561 /** 562 * Save this RDD as a SequenceFile of serialized objects. 563 */ 564 def saveAsObjectFile(path: String): Unit = { 565 rdd.saveAsObjectFile(path) 566 } 567 568 /** 569 * Creates tuples of the elements in this RDD by applying `f`. 570 */ 571 def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = { 572 // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459 573 implicit val ctag: ClassTag[U] = fakeClassTag 574 JavaPairRDD.fromRDD(rdd.keyBy(f)) 575 } 576 577 /** 578 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint 579 * directory set with SparkContext.setCheckpointDir() and all references to its parent 580 * RDDs will be removed. This function must be called before any job has been 581 * executed on this RDD. It is strongly recommended that this RDD is persisted in 582 * memory, otherwise saving it on a file will require recomputation. 583 */ 584 def checkpoint(): Unit = { 585 rdd.checkpoint() 586 } 587 588 /** 589 * Return whether this RDD has been checkpointed or not 590 */ 591 def isCheckpointed: Boolean = rdd.isCheckpointed 592 593 /** 594 * Gets the name of the file to which this RDD was checkpointed 595 */ 596 def getCheckpointFile(): Optional[String] = { 597 JavaUtils.optionToOptional(rdd.getCheckpointFile) 598 } 599 600 /** A description of this RDD and its recursive dependencies for debugging. */ 601 def toDebugString(): String = { 602 rdd.toDebugString 603 } 604 605 /** 606 * Returns the top k (largest) elements from this RDD as defined by 607 * the specified Comparator[T] and maintains the order. 608 * 609 * @note this method should only be used if the resulting array is expected to be small, as 610 * all the data is loaded into the driver's memory. 611 * @param num k, the number of top elements to return 612 * @param comp the comparator that defines the order 613 * @return an array of top elements 614 */ 615 def top(num: Int, comp: Comparator[T]): JList[T] = { 616 rdd.top(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava 617 } 618 619 /** 620 * Returns the top k (largest) elements from this RDD using the 621 * natural ordering for T and maintains the order. 622 * 623 * @note this method should only be used if the resulting array is expected to be small, as 624 * all the data is loaded into the driver's memory. 625 * @param num k, the number of top elements to return 626 * @return an array of top elements 627 */ 628 def top(num: Int): JList[T] = { 629 val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] 630 top(num, comp) 631 } 632 633 /** 634 * Returns the first k (smallest) elements from this RDD as defined by 635 * the specified Comparator[T] and maintains the order. 636 * 637 * @note this method should only be used if the resulting array is expected to be small, as 638 * all the data is loaded into the driver's memory. 639 * @param num k, the number of elements to return 640 * @param comp the comparator that defines the order 641 * @return an array of top elements 642 */ 643 def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = { 644 rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava 645 } 646 647 /** 648 * Returns the maximum element from this RDD as defined by the specified 649 * Comparator[T]. 650 * 651 * @param comp the comparator that defines ordering 652 * @return the maximum of the RDD 653 */ 654 def max(comp: Comparator[T]): T = { 655 rdd.max()(Ordering.comparatorToOrdering(comp)) 656 } 657 658 /** 659 * Returns the minimum element from this RDD as defined by the specified 660 * Comparator[T]. 661 * 662 * @param comp the comparator that defines ordering 663 * @return the minimum of the RDD 664 */ 665 def min(comp: Comparator[T]): T = { 666 rdd.min()(Ordering.comparatorToOrdering(comp)) 667 } 668 669 /** 670 * Returns the first k (smallest) elements from this RDD using the 671 * natural ordering for T while maintain the order. 672 * 673 * @note this method should only be used if the resulting array is expected to be small, as 674 * all the data is loaded into the driver's memory. 675 * @param num k, the number of top elements to return 676 * @return an array of top elements 677 */ 678 def takeOrdered(num: Int): JList[T] = { 679 val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] 680 takeOrdered(num, comp) 681 } 682 683 /** 684 * Return approximate number of distinct elements in the RDD. 685 * 686 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 687 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 688 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 689 * 690 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 691 * It must be greater than 0.000017. 692 */ 693 def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) 694 695 def name(): String = rdd.name 696 697 /** 698 * The asynchronous version of `count`, which returns a 699 * future for counting the number of elements in this RDD. 700 */ 701 def countAsync(): JavaFutureAction[jl.Long] = { 702 new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf) 703 } 704 705 /** 706 * The asynchronous version of `collect`, which returns a future for 707 * retrieving an array containing all of the elements in this RDD. 708 * 709 * @note this method should only be used if the resulting array is expected to be small, as 710 * all the data is loaded into the driver's memory. 711 */ 712 def collectAsync(): JavaFutureAction[JList[T]] = { 713 new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava) 714 } 715 716 /** 717 * The asynchronous version of the `take` action, which returns a 718 * future for retrieving the first `num` elements of this RDD. 719 * 720 * @note this method should only be used if the resulting array is expected to be small, as 721 * all the data is loaded into the driver's memory. 722 */ 723 def takeAsync(num: Int): JavaFutureAction[JList[T]] = { 724 new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava) 725 } 726 727 /** 728 * The asynchronous version of the `foreach` action, which 729 * applies a function f to all the elements of this RDD. 730 */ 731 def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = { 732 new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)), 733 { x => null.asInstanceOf[Void] }) 734 } 735 736 /** 737 * The asynchronous version of the `foreachPartition` action, which 738 * applies a function f to each partition of this RDD. 739 */ 740 def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = { 741 new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)), 742 { x => null.asInstanceOf[Void] }) 743 } 744} 745