1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.rdd 19 20import java.util.Random 21 22import scala.collection.{mutable, Map} 23import scala.collection.mutable.ArrayBuffer 24import scala.io.Codec 25import scala.language.implicitConversions 26import scala.reflect.{classTag, ClassTag} 27 28import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus 29import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} 30import org.apache.hadoop.io.compress.CompressionCodec 31import org.apache.hadoop.mapred.TextOutputFormat 32 33import org.apache.spark._ 34import org.apache.spark.Partitioner._ 35import org.apache.spark.annotation.{DeveloperApi, Since} 36import org.apache.spark.api.java.JavaRDD 37import org.apache.spark.internal.Logging 38import org.apache.spark.partial.BoundedDouble 39import org.apache.spark.partial.CountEvaluator 40import org.apache.spark.partial.GroupedCountEvaluator 41import org.apache.spark.partial.PartialResult 42import org.apache.spark.storage.{RDDBlockId, StorageLevel} 43import org.apache.spark.util.{BoundedPriorityQueue, Utils} 44import org.apache.spark.util.collection.OpenHashMap 45import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, 46 SamplingUtils} 47 48/** 49 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, 50 * partitioned collection of elements that can be operated on in parallel. This class contains the 51 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, 52 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value 53 * pairs, such as `groupByKey` and `join`; 54 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of 55 * Doubles; and 56 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that 57 * can be saved as SequenceFiles. 58 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] 59 * through implicit. 60 * 61 * Internally, each RDD is characterized by five main properties: 62 * 63 * - A list of partitions 64 * - A function for computing each split 65 * - A list of dependencies on other RDDs 66 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 67 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for 68 * an HDFS file) 69 * 70 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD 71 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for 72 * reading data from a new storage system) by overriding these functions. Please refer to the 73 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a> 74 * for more details on RDD internals. 75 */ 76abstract class RDD[T: ClassTag]( 77 @transient private var _sc: SparkContext, 78 @transient private var deps: Seq[Dependency[_]] 79 ) extends Serializable with Logging { 80 81 if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) { 82 // This is a warning instead of an exception in order to avoid breaking user programs that 83 // might have defined nested RDDs without running jobs with them. 84 logWarning("Spark does not support nested RDDs (see SPARK-5063)") 85 } 86 87 private def sc: SparkContext = { 88 if (_sc == null) { 89 throw new SparkException( 90 "This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " + 91 "transformations and actions are NOT invoked by the driver, but inside of other " + 92 "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " + 93 "because the values transformation and count action cannot be performed inside of the " + 94 "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " + 95 "Streaming job recovers from checkpoint, this exception will be hit if a reference to " + 96 "an RDD not defined by the streaming job is used in DStream operations. For more " + 97 "information, See SPARK-13758.") 98 } 99 _sc 100 } 101 102 /** Construct an RDD with just a one-to-one dependency on one parent */ 103 def this(@transient oneParent: RDD[_]) = 104 this(oneParent.context, List(new OneToOneDependency(oneParent))) 105 106 private[spark] def conf = sc.conf 107 // ======================================================================= 108 // Methods that should be implemented by subclasses of RDD 109 // ======================================================================= 110 111 /** 112 * :: DeveloperApi :: 113 * Implemented by subclasses to compute a given partition. 114 */ 115 @DeveloperApi 116 def compute(split: Partition, context: TaskContext): Iterator[T] 117 118 /** 119 * Implemented by subclasses to return the set of partitions in this RDD. This method will only 120 * be called once, so it is safe to implement a time-consuming computation in it. 121 * 122 * The partitions in this array must satisfy the following property: 123 * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` 124 */ 125 protected def getPartitions: Array[Partition] 126 127 /** 128 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only 129 * be called once, so it is safe to implement a time-consuming computation in it. 130 */ 131 protected def getDependencies: Seq[Dependency[_]] = deps 132 133 /** 134 * Optionally overridden by subclasses to specify placement preferences. 135 */ 136 protected def getPreferredLocations(split: Partition): Seq[String] = Nil 137 138 /** Optionally overridden by subclasses to specify how they are partitioned. */ 139 @transient val partitioner: Option[Partitioner] = None 140 141 // ======================================================================= 142 // Methods and fields available on all RDDs 143 // ======================================================================= 144 145 /** The SparkContext that created this RDD. */ 146 def sparkContext: SparkContext = sc 147 148 /** A unique ID for this RDD (within its SparkContext). */ 149 val id: Int = sc.newRddId() 150 151 /** A friendly name for this RDD */ 152 @transient var name: String = null 153 154 /** Assign a name to this RDD */ 155 def setName(_name: String): this.type = { 156 name = _name 157 this 158 } 159 160 /** 161 * Mark this RDD for persisting using the specified level. 162 * 163 * @param newLevel the target storage level 164 * @param allowOverride whether to override any existing level with the new one 165 */ 166 private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { 167 // TODO: Handle changes of StorageLevel 168 if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { 169 throw new UnsupportedOperationException( 170 "Cannot change storage level of an RDD after it was already assigned a level") 171 } 172 // If this is the first time this RDD is marked for persisting, register it 173 // with the SparkContext for cleanups and accounting. Do this only once. 174 if (storageLevel == StorageLevel.NONE) { 175 sc.cleaner.foreach(_.registerRDDForCleanup(this)) 176 sc.persistRDD(this) 177 } 178 storageLevel = newLevel 179 this 180 } 181 182 /** 183 * Set this RDD's storage level to persist its values across operations after the first time 184 * it is computed. This can only be used to assign a new storage level if the RDD does not 185 * have a storage level set yet. Local checkpointing is an exception. 186 */ 187 def persist(newLevel: StorageLevel): this.type = { 188 if (isLocallyCheckpointed) { 189 // This means the user previously called localCheckpoint(), which should have already 190 // marked this RDD for persisting. Here we should override the old storage level with 191 // one that is explicitly requested by the user (after adapting it to use disk). 192 persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) 193 } else { 194 persist(newLevel, allowOverride = false) 195 } 196 } 197 198 /** 199 * Persist this RDD with the default storage level (`MEMORY_ONLY`). 200 */ 201 def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) 202 203 /** 204 * Persist this RDD with the default storage level (`MEMORY_ONLY`). 205 */ 206 def cache(): this.type = persist() 207 208 /** 209 * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 210 * 211 * @param blocking Whether to block until all blocks are deleted. 212 * @return This RDD. 213 */ 214 def unpersist(blocking: Boolean = true): this.type = { 215 logInfo("Removing RDD " + id + " from persistence list") 216 sc.unpersistRDD(id, blocking) 217 storageLevel = StorageLevel.NONE 218 this 219 } 220 221 /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ 222 def getStorageLevel: StorageLevel = storageLevel 223 224 // Our dependencies and partitions will be gotten by calling subclass's methods below, and will 225 // be overwritten when we're checkpointed 226 private var dependencies_ : Seq[Dependency[_]] = null 227 @transient private var partitions_ : Array[Partition] = null 228 229 /** An Option holding our checkpoint RDD, if we are checkpointed */ 230 private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD) 231 232 /** 233 * Get the list of dependencies of this RDD, taking into account whether the 234 * RDD is checkpointed or not. 235 */ 236 final def dependencies: Seq[Dependency[_]] = { 237 checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { 238 if (dependencies_ == null) { 239 dependencies_ = getDependencies 240 } 241 dependencies_ 242 } 243 } 244 245 /** 246 * Get the array of partitions of this RDD, taking into account whether the 247 * RDD is checkpointed or not. 248 */ 249 final def partitions: Array[Partition] = { 250 checkpointRDD.map(_.partitions).getOrElse { 251 if (partitions_ == null) { 252 partitions_ = getPartitions 253 partitions_.zipWithIndex.foreach { case (partition, index) => 254 require(partition.index == index, 255 s"partitions($index).partition == ${partition.index}, but it should equal $index") 256 } 257 } 258 partitions_ 259 } 260 } 261 262 /** 263 * Returns the number of partitions of this RDD. 264 */ 265 @Since("1.6.0") 266 final def getNumPartitions: Int = partitions.length 267 268 /** 269 * Get the preferred locations of a partition, taking into account whether the 270 * RDD is checkpointed. 271 */ 272 final def preferredLocations(split: Partition): Seq[String] = { 273 checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { 274 getPreferredLocations(split) 275 } 276 } 277 278 /** 279 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. 280 * This should ''not'' be called by users directly, but is available for implementors of custom 281 * subclasses of RDD. 282 */ 283 final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 284 if (storageLevel != StorageLevel.NONE) { 285 getOrCompute(split, context) 286 } else { 287 computeOrReadCheckpoint(split, context) 288 } 289 } 290 291 /** 292 * Return the ancestors of the given RDD that are related to it only through a sequence of 293 * narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains 294 * no ordering on the RDDs returned. 295 */ 296 private[spark] def getNarrowAncestors: Seq[RDD[_]] = { 297 val ancestors = new mutable.HashSet[RDD[_]] 298 299 def visit(rdd: RDD[_]) { 300 val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]]) 301 val narrowParents = narrowDependencies.map(_.rdd) 302 val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains) 303 narrowParentsNotVisited.foreach { parent => 304 ancestors.add(parent) 305 visit(parent) 306 } 307 } 308 309 visit(this) 310 311 // In case there is a cycle, do not include the root itself 312 ancestors.filterNot(_ == this).toSeq 313 } 314 315 /** 316 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. 317 */ 318 private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = 319 { 320 if (isCheckpointedAndMaterialized) { 321 firstParent[T].iterator(split, context) 322 } else { 323 compute(split, context) 324 } 325 } 326 327 /** 328 * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. 329 */ 330 private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { 331 val blockId = RDDBlockId(id, partition.index) 332 var readCachedBlock = true 333 // This method is called on executors, so we need call SparkEnv.get instead of sc.env. 334 SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 335 readCachedBlock = false 336 computeOrReadCheckpoint(partition, context) 337 }) match { 338 case Left(blockResult) => 339 if (readCachedBlock) { 340 val existingMetrics = context.taskMetrics().inputMetrics 341 existingMetrics.incBytesRead(blockResult.bytes) 342 new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { 343 override def next(): T = { 344 existingMetrics.incRecordsRead(1) 345 delegate.next() 346 } 347 } 348 } else { 349 new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) 350 } 351 case Right(iter) => 352 new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) 353 } 354 } 355 356 /** 357 * Execute a block of code in a scope such that all new RDDs created in this body will 358 * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. 359 * 360 * Note: Return statements are NOT allowed in the given body. 361 */ 362 private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body) 363 364 // Transformations (return a new RDD) 365 366 /** 367 * Return a new RDD by applying a function to all elements of this RDD. 368 */ 369 def map[U: ClassTag](f: T => U): RDD[U] = withScope { 370 val cleanF = sc.clean(f) 371 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) 372 } 373 374 /** 375 * Return a new RDD by first applying a function to all elements of this 376 * RDD, and then flattening the results. 377 */ 378 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { 379 val cleanF = sc.clean(f) 380 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) 381 } 382 383 /** 384 * Return a new RDD containing only the elements that satisfy a predicate. 385 */ 386 def filter(f: T => Boolean): RDD[T] = withScope { 387 val cleanF = sc.clean(f) 388 new MapPartitionsRDD[T, T]( 389 this, 390 (context, pid, iter) => iter.filter(cleanF), 391 preservesPartitioning = true) 392 } 393 394 /** 395 * Return a new RDD containing the distinct elements in this RDD. 396 */ 397 def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 398 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) 399 } 400 401 /** 402 * Return a new RDD containing the distinct elements in this RDD. 403 */ 404 def distinct(): RDD[T] = withScope { 405 distinct(partitions.length) 406 } 407 408 /** 409 * Return a new RDD that has exactly numPartitions partitions. 410 * 411 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 412 * a shuffle to redistribute data. 413 * 414 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 415 * which can avoid performing a shuffle. 416 */ 417 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 418 coalesce(numPartitions, shuffle = true) 419 } 420 421 /** 422 * Return a new RDD that is reduced into `numPartitions` partitions. 423 * 424 * This results in a narrow dependency, e.g. if you go from 1000 partitions 425 * to 100 partitions, there will not be a shuffle, instead each of the 100 426 * new partitions will claim 10 of the current partitions. If a larger number 427 * of partitions is requested, it will stay at the current number of partitions. 428 * 429 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, 430 * this may result in your computation taking place on fewer nodes than 431 * you like (e.g. one node in the case of numPartitions = 1). To avoid this, 432 * you can pass shuffle = true. This will add a shuffle step, but means the 433 * current upstream partitions will be executed in parallel (per whatever 434 * the current partitioning is). 435 * 436 * @note With shuffle = true, you can actually coalesce to a larger number 437 * of partitions. This is useful if you have a small number of partitions, 438 * say 100, potentially with a few partitions being abnormally large. Calling 439 * coalesce(1000, shuffle = true) will result in 1000 partitions with the 440 * data distributed using a hash partitioner. The optional partition coalescer 441 * passed in must be serializable. 442 */ 443 def coalesce(numPartitions: Int, shuffle: Boolean = false, 444 partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 445 (implicit ord: Ordering[T] = null) 446 : RDD[T] = withScope { 447 require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") 448 if (shuffle) { 449 /** Distributes elements evenly across output partitions, starting from a random partition. */ 450 val distributePartition = (index: Int, items: Iterator[T]) => { 451 var position = (new Random(index)).nextInt(numPartitions) 452 items.map { t => 453 // Note that the hash code of the key will just be the key itself. The HashPartitioner 454 // will mod it with the number of total partitions. 455 position = position + 1 456 (position, t) 457 } 458 } : Iterator[(Int, T)] 459 460 // include a shuffle step so that our upstream tasks are still distributed 461 new CoalescedRDD( 462 new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), 463 new HashPartitioner(numPartitions)), 464 numPartitions, 465 partitionCoalescer).values 466 } else { 467 new CoalescedRDD(this, numPartitions, partitionCoalescer) 468 } 469 } 470 471 /** 472 * Return a sampled subset of this RDD. 473 * 474 * @param withReplacement can elements be sampled multiple times (replaced when sampled out) 475 * @param fraction expected size of the sample as a fraction of this RDD's size 476 * without replacement: probability that each element is chosen; fraction must be [0, 1] 477 * with replacement: expected number of times each element is chosen; fraction must be greater 478 * than or equal to 0 479 * @param seed seed for the random number generator 480 * 481 * @note This is NOT guaranteed to provide exactly the fraction of the count 482 * of the given [[RDD]]. 483 */ 484 def sample( 485 withReplacement: Boolean, 486 fraction: Double, 487 seed: Long = Utils.random.nextLong): RDD[T] = { 488 require(fraction >= 0, 489 s"Fraction must be nonnegative, but got ${fraction}") 490 491 withScope { 492 require(fraction >= 0.0, "Negative fraction value: " + fraction) 493 if (withReplacement) { 494 new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) 495 } else { 496 new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) 497 } 498 } 499 } 500 501 /** 502 * Randomly splits this RDD with the provided weights. 503 * 504 * @param weights weights for splits, will be normalized if they don't sum to 1 505 * @param seed random seed 506 * 507 * @return split RDDs in an array 508 */ 509 def randomSplit( 510 weights: Array[Double], 511 seed: Long = Utils.random.nextLong): Array[RDD[T]] = { 512 require(weights.forall(_ >= 0), 513 s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}") 514 require(weights.sum > 0, 515 s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}") 516 517 withScope { 518 val sum = weights.sum 519 val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) 520 normalizedCumWeights.sliding(2).map { x => 521 randomSampleWithRange(x(0), x(1), seed) 522 }.toArray 523 } 524 } 525 526 527 /** 528 * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability 529 * range. 530 * @param lb lower bound to use for the Bernoulli sampler 531 * @param ub upper bound to use for the Bernoulli sampler 532 * @param seed the seed for the Random number generator 533 * @return A random sub-sample of the RDD without replacement. 534 */ 535 private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = { 536 this.mapPartitionsWithIndex( { (index, partition) => 537 val sampler = new BernoulliCellSampler[T](lb, ub) 538 sampler.setSeed(seed + index) 539 sampler.sample(partition) 540 }, preservesPartitioning = true) 541 } 542 543 /** 544 * Return a fixed-size sampled subset of this RDD in an array 545 * 546 * @param withReplacement whether sampling is done with replacement 547 * @param num size of the returned sample 548 * @param seed seed for the random number generator 549 * @return sample of specified size in an array 550 * 551 * @note this method should only be used if the resulting array is expected to be small, as 552 * all the data is loaded into the driver's memory. 553 */ 554 def takeSample( 555 withReplacement: Boolean, 556 num: Int, 557 seed: Long = Utils.random.nextLong): Array[T] = withScope { 558 val numStDev = 10.0 559 560 require(num >= 0, "Negative number of elements requested") 561 require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt), 562 "Cannot support a sample size > Int.MaxValue - " + 563 s"$numStDev * math.sqrt(Int.MaxValue)") 564 565 if (num == 0) { 566 new Array[T](0) 567 } else { 568 val initialCount = this.count() 569 if (initialCount == 0) { 570 new Array[T](0) 571 } else { 572 val rand = new Random(seed) 573 if (!withReplacement && num >= initialCount) { 574 Utils.randomizeInPlace(this.collect(), rand) 575 } else { 576 val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, 577 withReplacement) 578 var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() 579 580 // If the first sample didn't turn out large enough, keep trying to take samples; 581 // this shouldn't happen often because we use a big multiplier for the initial size 582 var numIters = 0 583 while (samples.length < num) { 584 logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") 585 samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() 586 numIters += 1 587 } 588 Utils.randomizeInPlace(samples, rand).take(num) 589 } 590 } 591 } 592 } 593 594 /** 595 * Return the union of this RDD and another one. Any identical elements will appear multiple 596 * times (use `.distinct()` to eliminate them). 597 */ 598 def union(other: RDD[T]): RDD[T] = withScope { 599 sc.union(this, other) 600 } 601 602 /** 603 * Return the union of this RDD and another one. Any identical elements will appear multiple 604 * times (use `.distinct()` to eliminate them). 605 */ 606 def ++(other: RDD[T]): RDD[T] = withScope { 607 this.union(other) 608 } 609 610 /** 611 * Return this RDD sorted by the given key function. 612 */ 613 def sortBy[K]( 614 f: (T) => K, 615 ascending: Boolean = true, 616 numPartitions: Int = this.partitions.length) 617 (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { 618 this.keyBy[K](f) 619 .sortByKey(ascending, numPartitions) 620 .values 621 } 622 623 /** 624 * Return the intersection of this RDD and another one. The output will not contain any duplicate 625 * elements, even if the input RDDs did. 626 * 627 * @note This method performs a shuffle internally. 628 */ 629 def intersection(other: RDD[T]): RDD[T] = withScope { 630 this.map(v => (v, null)).cogroup(other.map(v => (v, null))) 631 .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } 632 .keys 633 } 634 635 /** 636 * Return the intersection of this RDD and another one. The output will not contain any duplicate 637 * elements, even if the input RDDs did. 638 * 639 * @note This method performs a shuffle internally. 640 * 641 * @param partitioner Partitioner to use for the resulting RDD 642 */ 643 def intersection( 644 other: RDD[T], 645 partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 646 this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner) 647 .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } 648 .keys 649 } 650 651 /** 652 * Return the intersection of this RDD and another one. The output will not contain any duplicate 653 * elements, even if the input RDDs did. Performs a hash partition across the cluster 654 * 655 * @note This method performs a shuffle internally. 656 * 657 * @param numPartitions How many partitions to use in the resulting RDD 658 */ 659 def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope { 660 intersection(other, new HashPartitioner(numPartitions)) 661 } 662 663 /** 664 * Return an RDD created by coalescing all elements within each partition into an array. 665 */ 666 def glom(): RDD[Array[T]] = withScope { 667 new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) 668 } 669 670 /** 671 * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of 672 * elements (a, b) where a is in `this` and b is in `other`. 673 */ 674 def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { 675 new CartesianRDD(sc, this, other) 676 } 677 678 /** 679 * Return an RDD of grouped items. Each group consists of a key and a sequence of elements 680 * mapping to that key. The ordering of elements within each group is not guaranteed, and 681 * may even differ each time the resulting RDD is evaluated. 682 * 683 * @note This operation may be very expensive. If you are grouping in order to perform an 684 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 685 * or `PairRDDFunctions.reduceByKey` will provide much better performance. 686 */ 687 def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { 688 groupBy[K](f, defaultPartitioner(this)) 689 } 690 691 /** 692 * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements 693 * mapping to that key. The ordering of elements within each group is not guaranteed, and 694 * may even differ each time the resulting RDD is evaluated. 695 * 696 * @note This operation may be very expensive. If you are grouping in order to perform an 697 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 698 * or `PairRDDFunctions.reduceByKey` will provide much better performance. 699 */ 700 def groupBy[K]( 701 f: T => K, 702 numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { 703 groupBy(f, new HashPartitioner(numPartitions)) 704 } 705 706 /** 707 * Return an RDD of grouped items. Each group consists of a key and a sequence of elements 708 * mapping to that key. The ordering of elements within each group is not guaranteed, and 709 * may even differ each time the resulting RDD is evaluated. 710 * 711 * @note This operation may be very expensive. If you are grouping in order to perform an 712 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 713 * or `PairRDDFunctions.reduceByKey` will provide much better performance. 714 */ 715 def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) 716 : RDD[(K, Iterable[T])] = withScope { 717 val cleanF = sc.clean(f) 718 this.map(t => (cleanF(t), t)).groupByKey(p) 719 } 720 721 /** 722 * Return an RDD created by piping elements to a forked external process. 723 */ 724 def pipe(command: String): RDD[String] = withScope { 725 // Similar to Runtime.exec(), if we are given a single string, split it into words 726 // using a standard StringTokenizer (i.e. by spaces) 727 pipe(PipedRDD.tokenize(command)) 728 } 729 730 /** 731 * Return an RDD created by piping elements to a forked external process. 732 */ 733 def pipe(command: String, env: Map[String, String]): RDD[String] = withScope { 734 // Similar to Runtime.exec(), if we are given a single string, split it into words 735 // using a standard StringTokenizer (i.e. by spaces) 736 pipe(PipedRDD.tokenize(command), env) 737 } 738 739 /** 740 * Return an RDD created by piping elements to a forked external process. The resulting RDD 741 * is computed by executing the given process once per partition. All elements 742 * of each input partition are written to a process's stdin as lines of input separated 743 * by a newline. The resulting partition consists of the process's stdout output, with 744 * each line of stdout resulting in one element of the output partition. A process is invoked 745 * even for empty partitions. 746 * 747 * The print behavior can be customized by providing two functions. 748 * 749 * @param command command to run in forked process. 750 * @param env environment variables to set. 751 * @param printPipeContext Before piping elements, this function is called as an opportunity 752 * to pipe context data. Print line function (like out.println) will be 753 * passed as printPipeContext's parameter. 754 * @param printRDDElement Use this function to customize how to pipe elements. This function 755 * will be called with each RDD element as the 1st parameter, and the 756 * print line function (like out.println()) as the 2nd parameter. 757 * An example of pipe the RDD data of groupBy() in a streaming way, 758 * instead of constructing a huge String to concat all the elements: 759 * {{{ 760 * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = 761 * for (e <- record._2) {f(e)} 762 * }}} 763 * @param separateWorkingDir Use separate working directories for each task. 764 * @param bufferSize Buffer size for the stdin writer for the piped process. 765 * @param encoding Char encoding used for interacting (via stdin, stdout and stderr) with 766 * the piped process 767 * @return the result RDD 768 */ 769 def pipe( 770 command: Seq[String], 771 env: Map[String, String] = Map(), 772 printPipeContext: (String => Unit) => Unit = null, 773 printRDDElement: (T, String => Unit) => Unit = null, 774 separateWorkingDir: Boolean = false, 775 bufferSize: Int = 8192, 776 encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope { 777 new PipedRDD(this, command, env, 778 if (printPipeContext ne null) sc.clean(printPipeContext) else null, 779 if (printRDDElement ne null) sc.clean(printRDDElement) else null, 780 separateWorkingDir, 781 bufferSize, 782 encoding) 783 } 784 785 /** 786 * Return a new RDD by applying a function to each partition of this RDD. 787 * 788 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which 789 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. 790 */ 791 def mapPartitions[U: ClassTag]( 792 f: Iterator[T] => Iterator[U], 793 preservesPartitioning: Boolean = false): RDD[U] = withScope { 794 val cleanedF = sc.clean(f) 795 new MapPartitionsRDD( 796 this, 797 (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), 798 preservesPartitioning) 799 } 800 801 /** 802 * [performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning. 803 * It is a performance API to be used carefully only if we are sure that the RDD elements are 804 * serializable and don't require closure cleaning. 805 * 806 * @param preservesPartitioning indicates whether the input function preserves the partitioner, 807 * which should be `false` unless this is a pair RDD and the input function doesn't modify 808 * the keys. 809 */ 810 private[spark] def mapPartitionsWithIndexInternal[U: ClassTag]( 811 f: (Int, Iterator[T]) => Iterator[U], 812 preservesPartitioning: Boolean = false): RDD[U] = withScope { 813 new MapPartitionsRDD( 814 this, 815 (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), 816 preservesPartitioning) 817 } 818 819 /** 820 * [performance] Spark's internal mapPartitions method that skips closure cleaning. 821 */ 822 private[spark] def mapPartitionsInternal[U: ClassTag]( 823 f: Iterator[T] => Iterator[U], 824 preservesPartitioning: Boolean = false): RDD[U] = withScope { 825 new MapPartitionsRDD( 826 this, 827 (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter), 828 preservesPartitioning) 829 } 830 831 /** 832 * Return a new RDD by applying a function to each partition of this RDD, while tracking the index 833 * of the original partition. 834 * 835 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which 836 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. 837 */ 838 def mapPartitionsWithIndex[U: ClassTag]( 839 f: (Int, Iterator[T]) => Iterator[U], 840 preservesPartitioning: Boolean = false): RDD[U] = withScope { 841 val cleanedF = sc.clean(f) 842 new MapPartitionsRDD( 843 this, 844 (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), 845 preservesPartitioning) 846 } 847 848 /** 849 * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, 850 * second element in each RDD, etc. Assumes that the two RDDs have the *same number of 851 * partitions* and the *same number of elements in each partition* (e.g. one was made through 852 * a map on the other). 853 */ 854 def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { 855 zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => 856 new Iterator[(T, U)] { 857 def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { 858 case (true, true) => true 859 case (false, false) => false 860 case _ => throw new SparkException("Can only zip RDDs with " + 861 "same number of elements in each partition") 862 } 863 def next(): (T, U) = (thisIter.next(), otherIter.next()) 864 } 865 } 866 } 867 868 /** 869 * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by 870 * applying a function to the zipped partitions. Assumes that all the RDDs have the 871 * *same number of partitions*, but does *not* require them to have the same number 872 * of elements in each partition. 873 */ 874 def zipPartitions[B: ClassTag, V: ClassTag] 875 (rdd2: RDD[B], preservesPartitioning: Boolean) 876 (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { 877 new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) 878 } 879 880 def zipPartitions[B: ClassTag, V: ClassTag] 881 (rdd2: RDD[B]) 882 (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { 883 zipPartitions(rdd2, preservesPartitioning = false)(f) 884 } 885 886 def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] 887 (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) 888 (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { 889 new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) 890 } 891 892 def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] 893 (rdd2: RDD[B], rdd3: RDD[C]) 894 (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { 895 zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f) 896 } 897 898 def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] 899 (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) 900 (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { 901 new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) 902 } 903 904 def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] 905 (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) 906 (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { 907 zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f) 908 } 909 910 911 // Actions (launch a job to return a value to the user program) 912 913 /** 914 * Applies a function f to all elements of this RDD. 915 */ 916 def foreach(f: T => Unit): Unit = withScope { 917 val cleanF = sc.clean(f) 918 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) 919 } 920 921 /** 922 * Applies a function f to each partition of this RDD. 923 */ 924 def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { 925 val cleanF = sc.clean(f) 926 sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) 927 } 928 929 /** 930 * Return an array that contains all of the elements in this RDD. 931 * 932 * @note This method should only be used if the resulting array is expected to be small, as 933 * all the data is loaded into the driver's memory. 934 */ 935 def collect(): Array[T] = withScope { 936 val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) 937 Array.concat(results: _*) 938 } 939 940 /** 941 * Return an iterator that contains all of the elements in this RDD. 942 * 943 * The iterator will consume as much memory as the largest partition in this RDD. 944 * 945 * @note This results in multiple Spark jobs, and if the input RDD is the result 946 * of a wide transformation (e.g. join with different partitioners), to avoid 947 * recomputing the input RDD should be cached first. 948 */ 949 def toLocalIterator: Iterator[T] = withScope { 950 def collectPartition(p: Int): Array[T] = { 951 sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head 952 } 953 (0 until partitions.length).iterator.flatMap(i => collectPartition(i)) 954 } 955 956 /** 957 * Return an RDD that contains all matching values by applying `f`. 958 */ 959 def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { 960 val cleanF = sc.clean(f) 961 filter(cleanF.isDefinedAt).map(cleanF) 962 } 963 964 /** 965 * Return an RDD with the elements from `this` that are not in `other`. 966 * 967 * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting 968 * RDD will be <= us. 969 */ 970 def subtract(other: RDD[T]): RDD[T] = withScope { 971 subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length))) 972 } 973 974 /** 975 * Return an RDD with the elements from `this` that are not in `other`. 976 */ 977 def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope { 978 subtract(other, new HashPartitioner(numPartitions)) 979 } 980 981 /** 982 * Return an RDD with the elements from `this` that are not in `other`. 983 */ 984 def subtract( 985 other: RDD[T], 986 p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 987 if (partitioner == Some(p)) { 988 // Our partitioner knows how to handle T (which, since we have a partitioner, is 989 // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples 990 val p2 = new Partitioner() { 991 override def numPartitions: Int = p.numPartitions 992 override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1) 993 } 994 // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies 995 // anyway, and when calling .keys, will not have a partitioner set, even though 996 // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be 997 // partitioned by the right/real keys (e.g. p). 998 this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys 999 } else { 1000 this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys 1001 } 1002 } 1003 1004 /** 1005 * Reduces the elements of this RDD using the specified commutative and 1006 * associative binary operator. 1007 */ 1008 def reduce(f: (T, T) => T): T = withScope { 1009 val cleanF = sc.clean(f) 1010 val reducePartition: Iterator[T] => Option[T] = iter => { 1011 if (iter.hasNext) { 1012 Some(iter.reduceLeft(cleanF)) 1013 } else { 1014 None 1015 } 1016 } 1017 var jobResult: Option[T] = None 1018 val mergeResult = (index: Int, taskResult: Option[T]) => { 1019 if (taskResult.isDefined) { 1020 jobResult = jobResult match { 1021 case Some(value) => Some(f(value, taskResult.get)) 1022 case None => taskResult 1023 } 1024 } 1025 } 1026 sc.runJob(this, reducePartition, mergeResult) 1027 // Get the final result out of our Option, or throw an exception if the RDD was empty 1028 jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) 1029 } 1030 1031 /** 1032 * Reduces the elements of this RDD in a multi-level tree pattern. 1033 * 1034 * @param depth suggested depth of the tree (default: 2) 1035 * @see [[org.apache.spark.rdd.RDD#reduce]] 1036 */ 1037 def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope { 1038 require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") 1039 val cleanF = context.clean(f) 1040 val reducePartition: Iterator[T] => Option[T] = iter => { 1041 if (iter.hasNext) { 1042 Some(iter.reduceLeft(cleanF)) 1043 } else { 1044 None 1045 } 1046 } 1047 val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it))) 1048 val op: (Option[T], Option[T]) => Option[T] = (c, x) => { 1049 if (c.isDefined && x.isDefined) { 1050 Some(cleanF(c.get, x.get)) 1051 } else if (c.isDefined) { 1052 c 1053 } else if (x.isDefined) { 1054 x 1055 } else { 1056 None 1057 } 1058 } 1059 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) 1060 .getOrElse(throw new UnsupportedOperationException("empty collection")) 1061 } 1062 1063 /** 1064 * Aggregate the elements of each partition, and then the results for all the partitions, using a 1065 * given associative function and a neutral "zero value". The function 1066 * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object 1067 * allocation; however, it should not modify t2. 1068 * 1069 * This behaves somewhat differently from fold operations implemented for non-distributed 1070 * collections in functional languages like Scala. This fold operation may be applied to 1071 * partitions individually, and then fold those results into the final result, rather than 1072 * apply the fold to each element sequentially in some defined ordering. For functions 1073 * that are not commutative, the result may differ from that of a fold applied to a 1074 * non-distributed collection. 1075 * 1076 * @param zeroValue the initial value for the accumulated result of each partition for the `op` 1077 * operator, and also the initial value for the combine results from different 1078 * partitions for the `op` operator - this will typically be the neutral 1079 * element (e.g. `Nil` for list concatenation or `0` for summation) 1080 * @param op an operator used to both accumulate results within a partition and combine results 1081 * from different partitions 1082 */ 1083 def fold(zeroValue: T)(op: (T, T) => T): T = withScope { 1084 // Clone the zero value since we will also be serializing it as part of tasks 1085 var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) 1086 val cleanOp = sc.clean(op) 1087 val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) 1088 val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) 1089 sc.runJob(this, foldPartition, mergeResult) 1090 jobResult 1091 } 1092 1093 /** 1094 * Aggregate the elements of each partition, and then the results for all the partitions, using 1095 * given combine functions and a neutral "zero value". This function can return a different result 1096 * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U 1097 * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are 1098 * allowed to modify and return their first argument instead of creating a new U to avoid memory 1099 * allocation. 1100 * 1101 * @param zeroValue the initial value for the accumulated result of each partition for the 1102 * `seqOp` operator, and also the initial value for the combine results from 1103 * different partitions for the `combOp` operator - this will typically be the 1104 * neutral element (e.g. `Nil` for list concatenation or `0` for summation) 1105 * @param seqOp an operator used to accumulate results within a partition 1106 * @param combOp an associative operator used to combine results from different partitions 1107 */ 1108 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { 1109 // Clone the zero value since we will also be serializing it as part of tasks 1110 var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) 1111 val cleanSeqOp = sc.clean(seqOp) 1112 val cleanCombOp = sc.clean(combOp) 1113 val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) 1114 val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) 1115 sc.runJob(this, aggregatePartition, mergeResult) 1116 jobResult 1117 } 1118 1119 /** 1120 * Aggregates the elements of this RDD in a multi-level tree pattern. 1121 * 1122 * @param depth suggested depth of the tree (default: 2) 1123 * @see [[org.apache.spark.rdd.RDD#aggregate]] 1124 */ 1125 def treeAggregate[U: ClassTag](zeroValue: U)( 1126 seqOp: (U, T) => U, 1127 combOp: (U, U) => U, 1128 depth: Int = 2): U = withScope { 1129 require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") 1130 if (partitions.length == 0) { 1131 Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) 1132 } else { 1133 val cleanSeqOp = context.clean(seqOp) 1134 val cleanCombOp = context.clean(combOp) 1135 val aggregatePartition = 1136 (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) 1137 var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) 1138 var numPartitions = partiallyAggregated.partitions.length 1139 val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) 1140 // If creating an extra level doesn't help reduce 1141 // the wall-clock time, we stop tree aggregation. 1142 1143 // Don't trigger TreeAggregation when it doesn't save wall-clock time 1144 while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) { 1145 numPartitions /= scale 1146 val curNumPartitions = numPartitions 1147 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { 1148 (i, iter) => iter.map((i % curNumPartitions, _)) 1149 }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values 1150 } 1151 partiallyAggregated.reduce(cleanCombOp) 1152 } 1153 } 1154 1155 /** 1156 * Return the number of elements in the RDD. 1157 */ 1158 def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum 1159 1160 /** 1161 * Approximate version of count() that returns a potentially incomplete result 1162 * within a timeout, even if not all tasks have finished. 1163 * 1164 * The confidence is the probability that the error bounds of the result will 1165 * contain the true value. That is, if countApprox were called repeatedly 1166 * with confidence 0.9, we would expect 90% of the results to contain the 1167 * true count. The confidence must be in the range [0,1] or an exception will 1168 * be thrown. 1169 * 1170 * @param timeout maximum time to wait for the job, in milliseconds 1171 * @param confidence the desired statistical confidence in the result 1172 * @return a potentially incomplete result, with error bounds 1173 */ 1174 def countApprox( 1175 timeout: Long, 1176 confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { 1177 require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]") 1178 val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => 1179 var result = 0L 1180 while (iter.hasNext) { 1181 result += 1L 1182 iter.next() 1183 } 1184 result 1185 } 1186 val evaluator = new CountEvaluator(partitions.length, confidence) 1187 sc.runApproximateJob(this, countElements, evaluator, timeout) 1188 } 1189 1190 /** 1191 * Return the count of each unique value in this RDD as a local map of (value, count) pairs. 1192 * 1193 * @note This method should only be used if the resulting map is expected to be small, as 1194 * the whole thing is loaded into the driver's memory. 1195 * To handle very large results, consider using 1196 * 1197 * {{{ 1198 * rdd.map(x => (x, 1L)).reduceByKey(_ + _) 1199 * }}} 1200 * 1201 * , which returns an RDD[T, Long] instead of a map. 1202 */ 1203 def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { 1204 map(value => (value, null)).countByKey() 1205 } 1206 1207 /** 1208 * Approximate version of countByValue(). 1209 * 1210 * @param timeout maximum time to wait for the job, in milliseconds 1211 * @param confidence the desired statistical confidence in the result 1212 * @return a potentially incomplete result, with error bounds 1213 */ 1214 def countByValueApprox(timeout: Long, confidence: Double = 0.95) 1215 (implicit ord: Ordering[T] = null) 1216 : PartialResult[Map[T, BoundedDouble]] = withScope { 1217 require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]") 1218 if (elementClassTag.runtimeClass.isArray) { 1219 throw new SparkException("countByValueApprox() does not support arrays") 1220 } 1221 val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) => 1222 val map = new OpenHashMap[T, Long] 1223 iter.foreach { 1224 t => map.changeValue(t, 1L, _ + 1L) 1225 } 1226 map 1227 } 1228 val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence) 1229 sc.runApproximateJob(this, countPartition, evaluator, timeout) 1230 } 1231 1232 /** 1233 * Return approximate number of distinct elements in the RDD. 1234 * 1235 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 1236 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 1237 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 1238 * 1239 * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero (`sp` is greater 1240 * than `p`) would trigger sparse representation of registers, which may reduce the memory 1241 * consumption and increase accuracy when the cardinality is small. 1242 * 1243 * @param p The precision value for the normal set. 1244 * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). 1245 * @param sp The precision value for the sparse set, between 0 and 32. 1246 * If `sp` equals 0, the sparse representation is skipped. 1247 */ 1248 def countApproxDistinct(p: Int, sp: Int): Long = withScope { 1249 require(p >= 4, s"p ($p) must be >= 4") 1250 require(sp <= 32, s"sp ($sp) must be <= 32") 1251 require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") 1252 val zeroCounter = new HyperLogLogPlus(p, sp) 1253 aggregate(zeroCounter)( 1254 (hll: HyperLogLogPlus, v: T) => { 1255 hll.offer(v) 1256 hll 1257 }, 1258 (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { 1259 h1.addAll(h2) 1260 h1 1261 }).cardinality() 1262 } 1263 1264 /** 1265 * Return approximate number of distinct elements in the RDD. 1266 * 1267 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 1268 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 1269 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 1270 * 1271 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 1272 * It must be greater than 0.000017. 1273 */ 1274 def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope { 1275 require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") 1276 val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt 1277 countApproxDistinct(if (p < 4) 4 else p, 0) 1278 } 1279 1280 /** 1281 * Zips this RDD with its element indices. The ordering is first based on the partition index 1282 * and then the ordering of items within each partition. So the first item in the first 1283 * partition gets index 0, and the last item in the last partition receives the largest index. 1284 * 1285 * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. 1286 * This method needs to trigger a spark job when this RDD contains more than one partitions. 1287 * 1288 * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of 1289 * elements in a partition. The index assigned to each element is therefore not guaranteed, 1290 * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee 1291 * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. 1292 */ 1293 def zipWithIndex(): RDD[(T, Long)] = withScope { 1294 new ZippedWithIndexRDD(this) 1295 } 1296 1297 /** 1298 * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, 1299 * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method 1300 * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. 1301 * 1302 * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of 1303 * elements in a partition. The unique ID assigned to each element is therefore not guaranteed, 1304 * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee 1305 * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. 1306 */ 1307 def zipWithUniqueId(): RDD[(T, Long)] = withScope { 1308 val n = this.partitions.length.toLong 1309 this.mapPartitionsWithIndex { case (k, iter) => 1310 Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) => 1311 (item, i * n + k) 1312 } 1313 } 1314 } 1315 1316 /** 1317 * Take the first num elements of the RDD. It works by first scanning one partition, and use the 1318 * results from that partition to estimate the number of additional partitions needed to satisfy 1319 * the limit. 1320 * 1321 * @note This method should only be used if the resulting array is expected to be small, as 1322 * all the data is loaded into the driver's memory. 1323 * 1324 * @note Due to complications in the internal implementation, this method will raise 1325 * an exception if called on an RDD of `Nothing` or `Null`. 1326 */ 1327 def take(num: Int): Array[T] = withScope { 1328 val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) 1329 if (num == 0) { 1330 new Array[T](0) 1331 } else { 1332 val buf = new ArrayBuffer[T] 1333 val totalParts = this.partitions.length 1334 var partsScanned = 0 1335 while (buf.size < num && partsScanned < totalParts) { 1336 // The number of partitions to try in this iteration. It is ok for this number to be 1337 // greater than totalParts because we actually cap it at totalParts in runJob. 1338 var numPartsToTry = 1L 1339 if (partsScanned > 0) { 1340 // If we didn't find any rows after the previous iteration, quadruple and retry. 1341 // Otherwise, interpolate the number of partitions we need to try, but overestimate 1342 // it by 50%. We also cap the estimation in the end. 1343 if (buf.isEmpty) { 1344 numPartsToTry = partsScanned * scaleUpFactor 1345 } else { 1346 // the left side of max is >=1 whenever partsScanned >= 2 1347 numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) 1348 numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) 1349 } 1350 } 1351 1352 val left = num - buf.size 1353 val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) 1354 val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) 1355 1356 res.foreach(buf ++= _.take(num - buf.size)) 1357 partsScanned += p.size 1358 } 1359 1360 buf.toArray 1361 } 1362 } 1363 1364 /** 1365 * Return the first element in this RDD. 1366 */ 1367 def first(): T = withScope { 1368 take(1) match { 1369 case Array(t) => t 1370 case _ => throw new UnsupportedOperationException("empty collection") 1371 } 1372 } 1373 1374 /** 1375 * Returns the top k (largest) elements from this RDD as defined by the specified 1376 * implicit Ordering[T] and maintains the ordering. This does the opposite of 1377 * [[takeOrdered]]. For example: 1378 * {{{ 1379 * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1) 1380 * // returns Array(12) 1381 * 1382 * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) 1383 * // returns Array(6, 5) 1384 * }}} 1385 * 1386 * @note This method should only be used if the resulting array is expected to be small, as 1387 * all the data is loaded into the driver's memory. 1388 * 1389 * @param num k, the number of top elements to return 1390 * @param ord the implicit ordering for T 1391 * @return an array of top elements 1392 */ 1393 def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { 1394 takeOrdered(num)(ord.reverse) 1395 } 1396 1397 /** 1398 * Returns the first k (smallest) elements from this RDD as defined by the specified 1399 * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]]. 1400 * For example: 1401 * {{{ 1402 * sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1) 1403 * // returns Array(2) 1404 * 1405 * sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) 1406 * // returns Array(2, 3) 1407 * }}} 1408 * 1409 * @note This method should only be used if the resulting array is expected to be small, as 1410 * all the data is loaded into the driver's memory. 1411 * 1412 * @param num k, the number of elements to return 1413 * @param ord the implicit ordering for T 1414 * @return an array of top elements 1415 */ 1416 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { 1417 if (num == 0) { 1418 Array.empty 1419 } else { 1420 val mapRDDs = mapPartitions { items => 1421 // Priority keeps the largest elements, so let's reverse the ordering. 1422 val queue = new BoundedPriorityQueue[T](num)(ord.reverse) 1423 queue ++= util.collection.Utils.takeOrdered(items, num)(ord) 1424 Iterator.single(queue) 1425 } 1426 if (mapRDDs.partitions.length == 0) { 1427 Array.empty 1428 } else { 1429 mapRDDs.reduce { (queue1, queue2) => 1430 queue1 ++= queue2 1431 queue1 1432 }.toArray.sorted(ord) 1433 } 1434 } 1435 } 1436 1437 /** 1438 * Returns the max of this RDD as defined by the implicit Ordering[T]. 1439 * @return the maximum element of the RDD 1440 * */ 1441 def max()(implicit ord: Ordering[T]): T = withScope { 1442 this.reduce(ord.max) 1443 } 1444 1445 /** 1446 * Returns the min of this RDD as defined by the implicit Ordering[T]. 1447 * @return the minimum element of the RDD 1448 * */ 1449 def min()(implicit ord: Ordering[T]): T = withScope { 1450 this.reduce(ord.min) 1451 } 1452 1453 /** 1454 * @note Due to complications in the internal implementation, this method will raise an 1455 * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice 1456 * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`. 1457 * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.) 1458 * @return true if and only if the RDD contains no elements at all. Note that an RDD 1459 * may be empty even when it has at least 1 partition. 1460 */ 1461 def isEmpty(): Boolean = withScope { 1462 partitions.length == 0 || take(1).length == 0 1463 } 1464 1465 /** 1466 * Save this RDD as a text file, using string representations of elements. 1467 */ 1468 def saveAsTextFile(path: String): Unit = withScope { 1469 // https://issues.apache.org/jira/browse/SPARK-2075 1470 // 1471 // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit 1472 // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` 1473 // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an 1474 // Ordering for `NullWritable`. That's why the compiler will generate different anonymous 1475 // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. 1476 // 1477 // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate 1478 // same bytecodes for `saveAsTextFile`. 1479 val nullWritableClassTag = implicitly[ClassTag[NullWritable]] 1480 val textClassTag = implicitly[ClassTag[Text]] 1481 val r = this.mapPartitions { iter => 1482 val text = new Text() 1483 iter.map { x => 1484 text.set(x.toString) 1485 (NullWritable.get(), text) 1486 } 1487 } 1488 RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) 1489 .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) 1490 } 1491 1492 /** 1493 * Save this RDD as a compressed text file, using string representations of elements. 1494 */ 1495 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope { 1496 // https://issues.apache.org/jira/browse/SPARK-2075 1497 val nullWritableClassTag = implicitly[ClassTag[NullWritable]] 1498 val textClassTag = implicitly[ClassTag[Text]] 1499 val r = this.mapPartitions { iter => 1500 val text = new Text() 1501 iter.map { x => 1502 text.set(x.toString) 1503 (NullWritable.get(), text) 1504 } 1505 } 1506 RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) 1507 .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) 1508 } 1509 1510 /** 1511 * Save this RDD as a SequenceFile of serialized objects. 1512 */ 1513 def saveAsObjectFile(path: String): Unit = withScope { 1514 this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) 1515 .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) 1516 .saveAsSequenceFile(path) 1517 } 1518 1519 /** 1520 * Creates tuples of the elements in this RDD by applying `f`. 1521 */ 1522 def keyBy[K](f: T => K): RDD[(K, T)] = withScope { 1523 val cleanedF = sc.clean(f) 1524 map(x => (cleanedF(x), x)) 1525 } 1526 1527 /** A private method for tests, to look at the contents of each partition */ 1528 private[spark] def collectPartitions(): Array[Array[T]] = withScope { 1529 sc.runJob(this, (iter: Iterator[T]) => iter.toArray) 1530 } 1531 1532 /** 1533 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint 1534 * directory set with `SparkContext#setCheckpointDir` and all references to its parent 1535 * RDDs will be removed. This function must be called before any job has been 1536 * executed on this RDD. It is strongly recommended that this RDD is persisted in 1537 * memory, otherwise saving it on a file will require recomputation. 1538 */ 1539 def checkpoint(): Unit = RDDCheckpointData.synchronized { 1540 // NOTE: we use a global lock here due to complexities downstream with ensuring 1541 // children RDD partitions point to the correct parent partitions. In the future 1542 // we should revisit this consideration. 1543 if (context.checkpointDir.isEmpty) { 1544 throw new SparkException("Checkpoint directory has not been set in the SparkContext") 1545 } else if (checkpointData.isEmpty) { 1546 checkpointData = Some(new ReliableRDDCheckpointData(this)) 1547 } 1548 } 1549 1550 /** 1551 * Mark this RDD for local checkpointing using Spark's existing caching layer. 1552 * 1553 * This method is for users who wish to truncate RDD lineages while skipping the expensive 1554 * step of replicating the materialized data in a reliable distributed file system. This is 1555 * useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX). 1556 * 1557 * Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed 1558 * data is written to ephemeral local storage in the executors instead of to a reliable, 1559 * fault-tolerant storage. The effect is that if an executor fails during the computation, 1560 * the checkpointed data may no longer be accessible, causing an irrecoverable job failure. 1561 * 1562 * This is NOT safe to use with dynamic allocation, which removes executors along 1563 * with their cached blocks. If you must use both features, you are advised to set 1564 * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. 1565 * 1566 * The checkpoint directory set through `SparkContext#setCheckpointDir` is not used. 1567 */ 1568 def localCheckpoint(): this.type = RDDCheckpointData.synchronized { 1569 if (conf.getBoolean("spark.dynamicAllocation.enabled", false) && 1570 conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) { 1571 logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " + 1572 "which removes executors along with their cached blocks. If you must use both " + 1573 "features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " + 1574 "to a high value. E.g. If you plan to use the RDD for 1 hour, set the timeout to " + 1575 "at least 1 hour.") 1576 } 1577 1578 // Note: At this point we do not actually know whether the user will call persist() on 1579 // this RDD later, so we must explicitly call it here ourselves to ensure the cached 1580 // blocks are registered for cleanup later in the SparkContext. 1581 // 1582 // If, however, the user has already called persist() on this RDD, then we must adapt 1583 // the storage level he/she specified to one that is appropriate for local checkpointing 1584 // (i.e. uses disk) to guarantee correctness. 1585 1586 if (storageLevel == StorageLevel.NONE) { 1587 persist(LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL) 1588 } else { 1589 persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true) 1590 } 1591 1592 // If this RDD is already checkpointed and materialized, its lineage is already truncated. 1593 // We must not override our `checkpointData` in this case because it is needed to recover 1594 // the checkpointed data. If it is overridden, next time materializing on this RDD will 1595 // cause error. 1596 if (isCheckpointedAndMaterialized) { 1597 logWarning("Not marking RDD for local checkpoint because it was already " + 1598 "checkpointed and materialized") 1599 } else { 1600 // Lineage is not truncated yet, so just override any existing checkpoint data with ours 1601 checkpointData match { 1602 case Some(_: ReliableRDDCheckpointData[_]) => logWarning( 1603 "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") 1604 case _ => 1605 } 1606 checkpointData = Some(new LocalRDDCheckpointData(this)) 1607 } 1608 this 1609 } 1610 1611 /** 1612 * Return whether this RDD is checkpointed and materialized, either reliably or locally. 1613 */ 1614 def isCheckpointed: Boolean = isCheckpointedAndMaterialized 1615 1616 /** 1617 * Return whether this RDD is checkpointed and materialized, either reliably or locally. 1618 * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the 1619 * return value. Exposed for testing. 1620 */ 1621 private[spark] def isCheckpointedAndMaterialized: Boolean = 1622 checkpointData.exists(_.isCheckpointed) 1623 1624 /** 1625 * Return whether this RDD is marked for local checkpointing. 1626 * Exposed for testing. 1627 */ 1628 private[rdd] def isLocallyCheckpointed: Boolean = { 1629 checkpointData match { 1630 case Some(_: LocalRDDCheckpointData[T]) => true 1631 case _ => false 1632 } 1633 } 1634 1635 /** 1636 * Gets the name of the directory to which this RDD was checkpointed. 1637 * This is not defined if the RDD is checkpointed locally. 1638 */ 1639 def getCheckpointFile: Option[String] = { 1640 checkpointData match { 1641 case Some(reliable: ReliableRDDCheckpointData[T]) => reliable.getCheckpointDir 1642 case _ => None 1643 } 1644 } 1645 1646 // ======================================================================= 1647 // Other internal methods and fields 1648 // ======================================================================= 1649 1650 private var storageLevel: StorageLevel = StorageLevel.NONE 1651 1652 /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ 1653 @transient private[spark] val creationSite = sc.getCallSite() 1654 1655 /** 1656 * The scope associated with the operation that created this RDD. 1657 * 1658 * This is more flexible than the call site and can be defined hierarchically. For more 1659 * detail, see the documentation of {{RDDOperationScope}}. This scope is not defined if the 1660 * user instantiates this RDD himself without using any Spark operations. 1661 */ 1662 @transient private[spark] val scope: Option[RDDOperationScope] = { 1663 Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson) 1664 } 1665 1666 private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") 1667 1668 private[spark] def elementClassTag: ClassTag[T] = classTag[T] 1669 1670 private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None 1671 1672 // Whether to checkpoint all ancestor RDDs that are marked for checkpointing. By default, 1673 // we stop as soon as we find the first such RDD, an optimization that allows us to write 1674 // less data but is not safe for all workloads. E.g. in streaming we may checkpoint both 1675 // an RDD and its parent in every batch, in which case the parent may never be checkpointed 1676 // and its lineage never truncated, leading to OOMs in the long run (SPARK-6847). 1677 private val checkpointAllMarkedAncestors = 1678 Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)) 1679 .map(_.toBoolean).getOrElse(false) 1680 1681 /** Returns the first parent RDD */ 1682 protected[spark] def firstParent[U: ClassTag]: RDD[U] = { 1683 dependencies.head.rdd.asInstanceOf[RDD[U]] 1684 } 1685 1686 /** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */ 1687 protected[spark] def parent[U: ClassTag](j: Int) = { 1688 dependencies(j).rdd.asInstanceOf[RDD[U]] 1689 } 1690 1691 /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */ 1692 def context: SparkContext = sc 1693 1694 /** 1695 * Private API for changing an RDD's ClassTag. 1696 * Used for internal Java-Scala API compatibility. 1697 */ 1698 private[spark] def retag(cls: Class[T]): RDD[T] = { 1699 val classTag: ClassTag[T] = ClassTag.apply(cls) 1700 this.retag(classTag) 1701 } 1702 1703 /** 1704 * Private API for changing an RDD's ClassTag. 1705 * Used for internal Java-Scala API compatibility. 1706 */ 1707 private[spark] def retag(implicit classTag: ClassTag[T]): RDD[T] = { 1708 this.mapPartitions(identity, preservesPartitioning = true)(classTag) 1709 } 1710 1711 // Avoid handling doCheckpoint multiple times to prevent excessive recursion 1712 @transient private var doCheckpointCalled = false 1713 1714 /** 1715 * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD 1716 * has completed (therefore the RDD has been materialized and potentially stored in memory). 1717 * doCheckpoint() is called recursively on the parent RDDs. 1718 */ 1719 private[spark] def doCheckpoint(): Unit = { 1720 RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { 1721 if (!doCheckpointCalled) { 1722 doCheckpointCalled = true 1723 if (checkpointData.isDefined) { 1724 if (checkpointAllMarkedAncestors) { 1725 // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint 1726 // them in parallel. 1727 // Checkpoint parents first because our lineage will be truncated after we 1728 // checkpoint ourselves 1729 dependencies.foreach(_.rdd.doCheckpoint()) 1730 } 1731 checkpointData.get.checkpoint() 1732 } else { 1733 dependencies.foreach(_.rdd.doCheckpoint()) 1734 } 1735 } 1736 } 1737 } 1738 1739 /** 1740 * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) 1741 * created from the checkpoint file, and forget its old dependencies and partitions. 1742 */ 1743 private[spark] def markCheckpointed(): Unit = { 1744 clearDependencies() 1745 partitions_ = null 1746 deps = null // Forget the constructor argument for dependencies too 1747 } 1748 1749 /** 1750 * Clears the dependencies of this RDD. This method must ensure that all references 1751 * to the original parent RDDs are removed to enable the parent RDDs to be garbage 1752 * collected. Subclasses of RDD may override this method for implementing their own cleaning 1753 * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example. 1754 */ 1755 protected def clearDependencies() { 1756 dependencies_ = null 1757 } 1758 1759 /** A description of this RDD and its recursive dependencies for debugging. */ 1760 def toDebugString: String = { 1761 // Get a debug description of an rdd without its children 1762 def debugSelf(rdd: RDD[_]): Seq[String] = { 1763 import Utils.bytesToString 1764 1765 val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" 1766 val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info => 1767 " CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format( 1768 info.numCachedPartitions, bytesToString(info.memSize), 1769 bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize))) 1770 1771 s"$rdd [$persistence]" +: storageInfo 1772 } 1773 1774 // Apply a different rule to the last child 1775 def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = { 1776 val len = rdd.dependencies.length 1777 len match { 1778 case 0 => Seq.empty 1779 case 1 => 1780 val d = rdd.dependencies.head 1781 debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]], true) 1782 case _ => 1783 val frontDeps = rdd.dependencies.take(len - 1) 1784 val frontDepStrings = frontDeps.flatMap( 1785 d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]])) 1786 1787 val lastDep = rdd.dependencies.last 1788 val lastDepStrings = 1789 debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_, _, _]], true) 1790 1791 (frontDepStrings ++ lastDepStrings) 1792 } 1793 } 1794 // The first RDD in the dependency stack has no parents, so no need for a +- 1795 def firstDebugString(rdd: RDD[_]): Seq[String] = { 1796 val partitionStr = "(" + rdd.partitions.length + ")" 1797 val leftOffset = (partitionStr.length - 1) / 2 1798 val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)) 1799 1800 debugSelf(rdd).zipWithIndex.map{ 1801 case (desc: String, 0) => s"$partitionStr $desc" 1802 case (desc: String, _) => s"$nextPrefix $desc" 1803 } ++ debugChildren(rdd, nextPrefix) 1804 } 1805 def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = { 1806 val partitionStr = "(" + rdd.partitions.length + ")" 1807 val leftOffset = (partitionStr.length - 1) / 2 1808 val thisPrefix = prefix.replaceAll("\\|\\s+$", "") 1809 val nextPrefix = ( 1810 thisPrefix 1811 + (if (isLastChild) " " else "| ") 1812 + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))) 1813 1814 debugSelf(rdd).zipWithIndex.map{ 1815 case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc" 1816 case (desc: String, _) => s"$nextPrefix$desc" 1817 } ++ debugChildren(rdd, nextPrefix) 1818 } 1819 def debugString( 1820 rdd: RDD[_], 1821 prefix: String = "", 1822 isShuffle: Boolean = true, 1823 isLastChild: Boolean = false): Seq[String] = { 1824 if (isShuffle) { 1825 shuffleDebugString(rdd, prefix, isLastChild) 1826 } else { 1827 debugSelf(rdd).map(prefix + _) ++ debugChildren(rdd, prefix) 1828 } 1829 } 1830 firstDebugString(this).mkString("\n") 1831 } 1832 1833 override def toString: String = "%s%s[%d] at %s".format( 1834 Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) 1835 1836 def toJavaRDD() : JavaRDD[T] = { 1837 new JavaRDD(this)(elementClassTag) 1838 } 1839} 1840 1841 1842/** 1843 * Defines implicit functions that provide extra functionalities on RDDs of specific types. 1844 * 1845 * For example, [[RDD.rddToPairRDDFunctions]] converts an RDD into a [[PairRDDFunctions]] for 1846 * key-value-pair RDDs, and enabling extra functionalities such as [[PairRDDFunctions.reduceByKey]]. 1847 */ 1848object RDD { 1849 1850 private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS = 1851 "spark.checkpoint.checkpointAllMarkedAncestors" 1852 1853 // The following implicit functions were in SparkContext before 1.3 and users had to 1854 // `import SparkContext._` to enable them. Now we move them here to make the compiler find 1855 // them automatically. However, we still keep the old functions in SparkContext for backward 1856 // compatibility and forward to the following functions directly. 1857 1858 implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) 1859 (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { 1860 new PairRDDFunctions(rdd) 1861 } 1862 1863 implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = { 1864 new AsyncRDDActions(rdd) 1865 } 1866 1867 implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)]) 1868 (implicit kt: ClassTag[K], vt: ClassTag[V], 1869 keyWritableFactory: WritableFactory[K], 1870 valueWritableFactory: WritableFactory[V]) 1871 : SequenceFileRDDFunctions[K, V] = { 1872 implicit val keyConverter = keyWritableFactory.convert 1873 implicit val valueConverter = valueWritableFactory.convert 1874 new SequenceFileRDDFunctions(rdd, 1875 keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt)) 1876 } 1877 1878 implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)]) 1879 : OrderedRDDFunctions[K, V, (K, V)] = { 1880 new OrderedRDDFunctions[K, V, (K, V)](rdd) 1881 } 1882 1883 implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = { 1884 new DoubleRDDFunctions(rdd) 1885 } 1886 1887 implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) 1888 : DoubleRDDFunctions = { 1889 new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) 1890 } 1891} 1892