1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.rdd 19 20import java.nio.ByteBuffer 21import java.text.SimpleDateFormat 22import java.util.{Date, HashMap => JHashMap, Locale} 23 24import scala.collection.{mutable, Map} 25import scala.collection.JavaConverters._ 26import scala.collection.mutable.ArrayBuffer 27import scala.reflect.ClassTag 28import scala.util.DynamicVariable 29 30import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus 31import org.apache.hadoop.conf.{Configurable, Configuration} 32import org.apache.hadoop.fs.FileSystem 33import org.apache.hadoop.io.SequenceFile.CompressionType 34import org.apache.hadoop.io.compress.CompressionCodec 35import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} 36import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptID, TaskType} 37import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 38 39import org.apache.spark._ 40import org.apache.spark.Partitioner.defaultPartitioner 41import org.apache.spark.annotation.Experimental 42import org.apache.spark.deploy.SparkHadoopUtil 43import org.apache.spark.executor.OutputMetrics 44import org.apache.spark.internal.Logging 45import org.apache.spark.partial.{BoundedDouble, PartialResult} 46import org.apache.spark.serializer.Serializer 47import org.apache.spark.util.{SerializableConfiguration, Utils} 48import org.apache.spark.util.collection.CompactBuffer 49import org.apache.spark.util.random.StratifiedSamplingUtils 50 51/** 52 * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. 53 */ 54class PairRDDFunctions[K, V](self: RDD[(K, V)]) 55 (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) 56 extends Logging with Serializable { 57 58 /** 59 * :: Experimental :: 60 * Generic function to combine the elements for each key using a custom set of aggregation 61 * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C 62 * 63 * Users provide three functions: 64 * 65 * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) 66 * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) 67 * - `mergeCombiners`, to combine two C's into a single one. 68 * 69 * In addition, users can control the partitioning of the output RDD, and whether to perform 70 * map-side aggregation (if a mapper can produce multiple items with the same key). 71 * 72 * @note V and C can be different -- for example, one might group an RDD of type 73 * (Int, Int) into an RDD of type (Int, Seq[Int]). 74 */ 75 @Experimental 76 def combineByKeyWithClassTag[C]( 77 createCombiner: V => C, 78 mergeValue: (C, V) => C, 79 mergeCombiners: (C, C) => C, 80 partitioner: Partitioner, 81 mapSideCombine: Boolean = true, 82 serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { 83 require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 84 if (keyClass.isArray) { 85 if (mapSideCombine) { 86 throw new SparkException("Cannot use map-side combining with array keys.") 87 } 88 if (partitioner.isInstanceOf[HashPartitioner]) { 89 throw new SparkException("HashPartitioner cannot partition array keys.") 90 } 91 } 92 val aggregator = new Aggregator[K, V, C]( 93 self.context.clean(createCombiner), 94 self.context.clean(mergeValue), 95 self.context.clean(mergeCombiners)) 96 if (self.partitioner == Some(partitioner)) { 97 self.mapPartitions(iter => { 98 val context = TaskContext.get() 99 new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) 100 }, preservesPartitioning = true) 101 } else { 102 new ShuffledRDD[K, V, C](self, partitioner) 103 .setSerializer(serializer) 104 .setAggregator(aggregator) 105 .setMapSideCombine(mapSideCombine) 106 } 107 } 108 109 /** 110 * Generic function to combine the elements for each key using a custom set of aggregation 111 * functions. This method is here for backward compatibility. It does not provide combiner 112 * classtag information to the shuffle. 113 * 114 * @see [[combineByKeyWithClassTag]] 115 */ 116 def combineByKey[C]( 117 createCombiner: V => C, 118 mergeValue: (C, V) => C, 119 mergeCombiners: (C, C) => C, 120 partitioner: Partitioner, 121 mapSideCombine: Boolean = true, 122 serializer: Serializer = null): RDD[(K, C)] = self.withScope { 123 combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, 124 partitioner, mapSideCombine, serializer)(null) 125 } 126 127 /** 128 * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. 129 * This method is here for backward compatibility. It does not provide combiner 130 * classtag information to the shuffle. 131 * 132 * @see [[combineByKeyWithClassTag]] 133 */ 134 def combineByKey[C]( 135 createCombiner: V => C, 136 mergeValue: (C, V) => C, 137 mergeCombiners: (C, C) => C, 138 numPartitions: Int): RDD[(K, C)] = self.withScope { 139 combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null) 140 } 141 142 /** 143 * :: Experimental :: 144 * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. 145 */ 146 @Experimental 147 def combineByKeyWithClassTag[C]( 148 createCombiner: V => C, 149 mergeValue: (C, V) => C, 150 mergeCombiners: (C, C) => C, 151 numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { 152 combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, 153 new HashPartitioner(numPartitions)) 154 } 155 156 /** 157 * Aggregate the values of each key, using given combine functions and a neutral "zero value". 158 * This function can return a different result type, U, than the type of the values in this RDD, 159 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, 160 * as in scala.TraversableOnce. The former operation is used for merging values within a 161 * partition, and the latter is used for merging values between partitions. To avoid memory 162 * allocation, both of these functions are allowed to modify and return their first argument 163 * instead of creating a new U. 164 */ 165 def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, 166 combOp: (U, U) => U): RDD[(K, U)] = self.withScope { 167 // Serialize the zero value to a byte array so that we can get a new clone of it on each key 168 val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) 169 val zeroArray = new Array[Byte](zeroBuffer.limit) 170 zeroBuffer.get(zeroArray) 171 172 lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() 173 val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) 174 175 // We will clean the combiner closure later in `combineByKey` 176 val cleanedSeqOp = self.context.clean(seqOp) 177 combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), 178 cleanedSeqOp, combOp, partitioner) 179 } 180 181 /** 182 * Aggregate the values of each key, using given combine functions and a neutral "zero value". 183 * This function can return a different result type, U, than the type of the values in this RDD, 184 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, 185 * as in scala.TraversableOnce. The former operation is used for merging values within a 186 * partition, and the latter is used for merging values between partitions. To avoid memory 187 * allocation, both of these functions are allowed to modify and return their first argument 188 * instead of creating a new U. 189 */ 190 def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, 191 combOp: (U, U) => U): RDD[(K, U)] = self.withScope { 192 aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) 193 } 194 195 /** 196 * Aggregate the values of each key, using given combine functions and a neutral "zero value". 197 * This function can return a different result type, U, than the type of the values in this RDD, 198 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, 199 * as in scala.TraversableOnce. The former operation is used for merging values within a 200 * partition, and the latter is used for merging values between partitions. To avoid memory 201 * allocation, both of these functions are allowed to modify and return their first argument 202 * instead of creating a new U. 203 */ 204 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, 205 combOp: (U, U) => U): RDD[(K, U)] = self.withScope { 206 aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) 207 } 208 209 /** 210 * Merge the values for each key using an associative function and a neutral "zero value" which 211 * may be added to the result an arbitrary number of times, and must not change the result 212 * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). 213 */ 214 def foldByKey( 215 zeroValue: V, 216 partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { 217 // Serialize the zero value to a byte array so that we can get a new clone of it on each key 218 val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) 219 val zeroArray = new Array[Byte](zeroBuffer.limit) 220 zeroBuffer.get(zeroArray) 221 222 // When deserializing, use a lazy val to create just one instance of the serializer per task 223 lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() 224 val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) 225 226 val cleanedFunc = self.context.clean(func) 227 combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), 228 cleanedFunc, cleanedFunc, partitioner) 229 } 230 231 /** 232 * Merge the values for each key using an associative function and a neutral "zero value" which 233 * may be added to the result an arbitrary number of times, and must not change the result 234 * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). 235 */ 236 def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope { 237 foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) 238 } 239 240 /** 241 * Merge the values for each key using an associative function and a neutral "zero value" which 242 * may be added to the result an arbitrary number of times, and must not change the result 243 * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). 244 */ 245 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { 246 foldByKey(zeroValue, defaultPartitioner(self))(func) 247 } 248 249 /** 250 * Return a subset of this RDD sampled by key (via stratified sampling). 251 * 252 * Create a sample of this RDD using variable sampling rates for different keys as specified by 253 * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the 254 * RDD, to produce a sample of size that's approximately equal to the sum of 255 * math.ceil(numItems * samplingRate) over all key values. 256 * 257 * @param withReplacement whether to sample with or without replacement 258 * @param fractions map of specific keys to sampling rates 259 * @param seed seed for the random number generator 260 * @return RDD containing the sampled subset 261 */ 262 def sampleByKey(withReplacement: Boolean, 263 fractions: Map[K, Double], 264 seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { 265 266 require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") 267 268 val samplingFunc = if (withReplacement) { 269 StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed) 270 } else { 271 StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed) 272 } 273 self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true) 274 } 275 276 /** 277 * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly 278 * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). 279 * 280 * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to 281 * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate) 282 * over all key values with a 99.99% confidence. When sampling without replacement, we need one 283 * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need 284 * two additional passes. 285 * 286 * @param withReplacement whether to sample with or without replacement 287 * @param fractions map of specific keys to sampling rates 288 * @param seed seed for the random number generator 289 * @return RDD containing the sampled subset 290 */ 291 def sampleByKeyExact( 292 withReplacement: Boolean, 293 fractions: Map[K, Double], 294 seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { 295 296 require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") 297 298 val samplingFunc = if (withReplacement) { 299 StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, true, seed) 300 } else { 301 StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed) 302 } 303 self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true) 304 } 305 306 /** 307 * Merge the values for each key using an associative and commutative reduce function. This will 308 * also perform the merging locally on each mapper before sending results to a reducer, similarly 309 * to a "combiner" in MapReduce. 310 */ 311 def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { 312 combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) 313 } 314 315 /** 316 * Merge the values for each key using an associative and commutative reduce function. This will 317 * also perform the merging locally on each mapper before sending results to a reducer, similarly 318 * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. 319 */ 320 def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { 321 reduceByKey(new HashPartitioner(numPartitions), func) 322 } 323 324 /** 325 * Merge the values for each key using an associative and commutative reduce function. This will 326 * also perform the merging locally on each mapper before sending results to a reducer, similarly 327 * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ 328 * parallelism level. 329 */ 330 def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { 331 reduceByKey(defaultPartitioner(self), func) 332 } 333 334 /** 335 * Merge the values for each key using an associative and commutative reduce function, but return 336 * the results immediately to the master as a Map. This will also perform the merging locally on 337 * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. 338 */ 339 def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope { 340 val cleanedF = self.sparkContext.clean(func) 341 342 if (keyClass.isArray) { 343 throw new SparkException("reduceByKeyLocally() does not support array keys") 344 } 345 346 val reducePartition = (iter: Iterator[(K, V)]) => { 347 val map = new JHashMap[K, V] 348 iter.foreach { pair => 349 val old = map.get(pair._1) 350 map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2)) 351 } 352 Iterator(map) 353 } : Iterator[JHashMap[K, V]] 354 355 val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => { 356 m2.asScala.foreach { pair => 357 val old = m1.get(pair._1) 358 m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2)) 359 } 360 m1 361 } : JHashMap[K, V] 362 363 self.mapPartitions(reducePartition).reduce(mergeMaps).asScala 364 } 365 366 /** 367 * Count the number of elements for each key, collecting the results to a local Map. 368 * 369 * @note This method should only be used if the resulting map is expected to be small, as 370 * the whole thing is loaded into the driver's memory. 371 * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which 372 * returns an RDD[T, Long] instead of a map. 373 */ 374 def countByKey(): Map[K, Long] = self.withScope { 375 self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap 376 } 377 378 /** 379 * Approximate version of countByKey that can return a partial result if it does 380 * not finish within a timeout. 381 * 382 * The confidence is the probability that the error bounds of the result will 383 * contain the true value. That is, if countApprox were called repeatedly 384 * with confidence 0.9, we would expect 90% of the results to contain the 385 * true count. The confidence must be in the range [0,1] or an exception will 386 * be thrown. 387 * 388 * @param timeout maximum time to wait for the job, in milliseconds 389 * @param confidence the desired statistical confidence in the result 390 * @return a potentially incomplete result, with error bounds 391 */ 392 def countByKeyApprox(timeout: Long, confidence: Double = 0.95) 393 : PartialResult[Map[K, BoundedDouble]] = self.withScope { 394 self.map(_._1).countByValueApprox(timeout, confidence) 395 } 396 397 /** 398 * Return approximate number of distinct values for each key in this RDD. 399 * 400 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 401 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 402 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 403 * 404 * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero (`sp` is 405 * greater than `p`) would trigger sparse representation of registers, which may reduce the 406 * memory consumption and increase accuracy when the cardinality is small. 407 * 408 * @param p The precision value for the normal set. 409 * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). 410 * @param sp The precision value for the sparse set, between 0 and 32. 411 * If `sp` equals 0, the sparse representation is skipped. 412 * @param partitioner Partitioner to use for the resulting RDD. 413 */ 414 def countApproxDistinctByKey( 415 p: Int, 416 sp: Int, 417 partitioner: Partitioner): RDD[(K, Long)] = self.withScope { 418 require(p >= 4, s"p ($p) must be >= 4") 419 require(sp <= 32, s"sp ($sp) must be <= 32") 420 require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") 421 val createHLL = (v: V) => { 422 val hll = new HyperLogLogPlus(p, sp) 423 hll.offer(v) 424 hll 425 } 426 val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { 427 hll.offer(v) 428 hll 429 } 430 val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { 431 h1.addAll(h2) 432 h1 433 } 434 435 combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner) 436 .mapValues(_.cardinality()) 437 } 438 439 /** 440 * Return approximate number of distinct values for each key in this RDD. 441 * 442 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 443 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 444 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 445 * 446 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 447 * It must be greater than 0.000017. 448 * @param partitioner partitioner of the resulting RDD 449 */ 450 def countApproxDistinctByKey( 451 relativeSD: Double, 452 partitioner: Partitioner): RDD[(K, Long)] = self.withScope { 453 require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") 454 val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt 455 assert(p <= 32) 456 countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) 457 } 458 459 /** 460 * Return approximate number of distinct values for each key in this RDD. 461 * 462 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 463 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 464 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 465 * 466 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 467 * It must be greater than 0.000017. 468 * @param numPartitions number of partitions of the resulting RDD 469 */ 470 def countApproxDistinctByKey( 471 relativeSD: Double, 472 numPartitions: Int): RDD[(K, Long)] = self.withScope { 473 countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) 474 } 475 476 /** 477 * Return approximate number of distinct values for each key in this RDD. 478 * 479 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: 480 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available 481 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. 482 * 483 * @param relativeSD Relative accuracy. Smaller values create counters that require more space. 484 * It must be greater than 0.000017. 485 */ 486 def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = self.withScope { 487 countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) 488 } 489 490 /** 491 * Group the values for each key in the RDD into a single sequence. Allows controlling the 492 * partitioning of the resulting key-value pair RDD by passing a Partitioner. 493 * The ordering of elements within each group is not guaranteed, and may even differ 494 * each time the resulting RDD is evaluated. 495 * 496 * @note This operation may be very expensive. If you are grouping in order to perform an 497 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 498 * or `PairRDDFunctions.reduceByKey` will provide much better performance. 499 * 500 * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 501 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. 502 */ 503 def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { 504 // groupByKey shouldn't use map side combine because map side combine does not 505 // reduce the amount of data shuffled and requires all map side data be inserted 506 // into a hash table, leading to more objects in the old gen. 507 val createCombiner = (v: V) => CompactBuffer(v) 508 val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v 509 val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 510 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( 511 createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) 512 bufs.asInstanceOf[RDD[(K, Iterable[V])]] 513 } 514 515 /** 516 * Group the values for each key in the RDD into a single sequence. Hash-partitions the 517 * resulting RDD with into `numPartitions` partitions. The ordering of elements within 518 * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. 519 * 520 * @note This operation may be very expensive. If you are grouping in order to perform an 521 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 522 * or `PairRDDFunctions.reduceByKey` will provide much better performance. 523 * 524 * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 525 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. 526 */ 527 def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope { 528 groupByKey(new HashPartitioner(numPartitions)) 529 } 530 531 /** 532 * Return a copy of the RDD partitioned using the specified partitioner. 533 */ 534 def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { 535 if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { 536 throw new SparkException("HashPartitioner cannot partition array keys.") 537 } 538 if (self.partitioner == Some(partitioner)) { 539 self 540 } else { 541 new ShuffledRDD[K, V, V](self, partitioner) 542 } 543 } 544 545 /** 546 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each 547 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and 548 * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. 549 */ 550 def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { 551 this.cogroup(other, partitioner).flatMapValues( pair => 552 for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) 553 ) 554 } 555 556 /** 557 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the 558 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the 559 * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to 560 * partition the output RDD. 561 */ 562 def leftOuterJoin[W]( 563 other: RDD[(K, W)], 564 partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { 565 this.cogroup(other, partitioner).flatMapValues { pair => 566 if (pair._2.isEmpty) { 567 pair._1.iterator.map(v => (v, None)) 568 } else { 569 for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) 570 } 571 } 572 } 573 574 /** 575 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the 576 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the 577 * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to 578 * partition the output RDD. 579 */ 580 def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) 581 : RDD[(K, (Option[V], W))] = self.withScope { 582 this.cogroup(other, partitioner).flatMapValues { pair => 583 if (pair._1.isEmpty) { 584 pair._2.iterator.map(w => (None, w)) 585 } else { 586 for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) 587 } 588 } 589 } 590 591 /** 592 * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the 593 * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or 594 * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each 595 * element (k, w) in `other`, the resulting RDD will either contain all pairs 596 * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements 597 * in `this` have key k. Uses the given Partitioner to partition the output RDD. 598 */ 599 def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) 600 : RDD[(K, (Option[V], Option[W]))] = self.withScope { 601 this.cogroup(other, partitioner).flatMapValues { 602 case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) 603 case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) 604 case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) 605 } 606 } 607 608 /** 609 * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the 610 * existing partitioner/parallelism level. This method is here for backward compatibility. It 611 * does not provide combiner classtag information to the shuffle. 612 * 613 * @see [[combineByKeyWithClassTag]] 614 */ 615 def combineByKey[C]( 616 createCombiner: V => C, 617 mergeValue: (C, V) => C, 618 mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { 619 combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) 620 } 621 622 /** 623 * :: Experimental :: 624 * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the 625 * existing partitioner/parallelism level. 626 */ 627 @Experimental 628 def combineByKeyWithClassTag[C]( 629 createCombiner: V => C, 630 mergeValue: (C, V) => C, 631 mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { 632 combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) 633 } 634 635 /** 636 * Group the values for each key in the RDD into a single sequence. Hash-partitions the 637 * resulting RDD with the existing partitioner/parallelism level. The ordering of elements 638 * within each group is not guaranteed, and may even differ each time the resulting RDD is 639 * evaluated. 640 * 641 * @note This operation may be very expensive. If you are grouping in order to perform an 642 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 643 * or `PairRDDFunctions.reduceByKey` will provide much better performance. 644 */ 645 def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { 646 groupByKey(defaultPartitioner(self)) 647 } 648 649 /** 650 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each 651 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and 652 * (k, v2) is in `other`. Performs a hash join across the cluster. 653 */ 654 def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { 655 join(other, defaultPartitioner(self, other)) 656 } 657 658 /** 659 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each 660 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and 661 * (k, v2) is in `other`. Performs a hash join across the cluster. 662 */ 663 def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope { 664 join(other, new HashPartitioner(numPartitions)) 665 } 666 667 /** 668 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the 669 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the 670 * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output 671 * using the existing partitioner/parallelism level. 672 */ 673 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope { 674 leftOuterJoin(other, defaultPartitioner(self, other)) 675 } 676 677 /** 678 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the 679 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the 680 * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output 681 * into `numPartitions` partitions. 682 */ 683 def leftOuterJoin[W]( 684 other: RDD[(K, W)], 685 numPartitions: Int): RDD[(K, (V, Option[W]))] = self.withScope { 686 leftOuterJoin(other, new HashPartitioner(numPartitions)) 687 } 688 689 /** 690 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the 691 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the 692 * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting 693 * RDD using the existing partitioner/parallelism level. 694 */ 695 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope { 696 rightOuterJoin(other, defaultPartitioner(self, other)) 697 } 698 699 /** 700 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the 701 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the 702 * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting 703 * RDD into the given number of partitions. 704 */ 705 def rightOuterJoin[W]( 706 other: RDD[(K, W)], 707 numPartitions: Int): RDD[(K, (Option[V], W))] = self.withScope { 708 rightOuterJoin(other, new HashPartitioner(numPartitions)) 709 } 710 711 /** 712 * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the 713 * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or 714 * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each 715 * element (k, w) in `other`, the resulting RDD will either contain all pairs 716 * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements 717 * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ 718 * parallelism level. 719 */ 720 def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope { 721 fullOuterJoin(other, defaultPartitioner(self, other)) 722 } 723 724 /** 725 * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the 726 * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or 727 * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each 728 * element (k, w) in `other`, the resulting RDD will either contain all pairs 729 * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements 730 * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. 731 */ 732 def fullOuterJoin[W]( 733 other: RDD[(K, W)], 734 numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope { 735 fullOuterJoin(other, new HashPartitioner(numPartitions)) 736 } 737 738 /** 739 * Return the key-value pairs in this RDD to the master as a Map. 740 * 741 * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only 742 * one value per key is preserved in the map returned) 743 * 744 * @note this method should only be used if the resulting data is expected to be small, as 745 * all the data is loaded into the driver's memory. 746 */ 747 def collectAsMap(): Map[K, V] = self.withScope { 748 val data = self.collect() 749 val map = new mutable.HashMap[K, V] 750 map.sizeHint(data.length) 751 data.foreach { pair => map.put(pair._1, pair._2) } 752 map 753 } 754 755 /** 756 * Pass each value in the key-value pair RDD through a map function without changing the keys; 757 * this also retains the original RDD's partitioning. 758 */ 759 def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope { 760 val cleanF = self.context.clean(f) 761 new MapPartitionsRDD[(K, U), (K, V)](self, 762 (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) }, 763 preservesPartitioning = true) 764 } 765 766 /** 767 * Pass each value in the key-value pair RDD through a flatMap function without changing the 768 * keys; this also retains the original RDD's partitioning. 769 */ 770 def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope { 771 val cleanF = self.context.clean(f) 772 new MapPartitionsRDD[(K, U), (K, V)](self, 773 (context, pid, iter) => iter.flatMap { case (k, v) => 774 cleanF(v).map(x => (k, x)) 775 }, 776 preservesPartitioning = true) 777 } 778 779 /** 780 * For each key k in `this` or `other1` or `other2` or `other3`, 781 * return a resulting RDD that contains a tuple with the list of values 782 * for that key in `this`, `other1`, `other2` and `other3`. 783 */ 784 def cogroup[W1, W2, W3](other1: RDD[(K, W1)], 785 other2: RDD[(K, W2)], 786 other3: RDD[(K, W3)], 787 partitioner: Partitioner) 788 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { 789 if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { 790 throw new SparkException("HashPartitioner cannot partition array keys.") 791 } 792 val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner) 793 cg.mapValues { case Array(vs, w1s, w2s, w3s) => 794 (vs.asInstanceOf[Iterable[V]], 795 w1s.asInstanceOf[Iterable[W1]], 796 w2s.asInstanceOf[Iterable[W2]], 797 w3s.asInstanceOf[Iterable[W3]]) 798 } 799 } 800 801 /** 802 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the 803 * list of values for that key in `this` as well as `other`. 804 */ 805 def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) 806 : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { 807 if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { 808 throw new SparkException("HashPartitioner cannot partition array keys.") 809 } 810 val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) 811 cg.mapValues { case Array(vs, w1s) => 812 (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) 813 } 814 } 815 816 /** 817 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a 818 * tuple with the list of values for that key in `this`, `other1` and `other2`. 819 */ 820 def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) 821 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { 822 if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { 823 throw new SparkException("HashPartitioner cannot partition array keys.") 824 } 825 val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) 826 cg.mapValues { case Array(vs, w1s, w2s) => 827 (vs.asInstanceOf[Iterable[V]], 828 w1s.asInstanceOf[Iterable[W1]], 829 w2s.asInstanceOf[Iterable[W2]]) 830 } 831 } 832 833 /** 834 * For each key k in `this` or `other1` or `other2` or `other3`, 835 * return a resulting RDD that contains a tuple with the list of values 836 * for that key in `this`, `other1`, `other2` and `other3`. 837 */ 838 def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) 839 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { 840 cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) 841 } 842 843 /** 844 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the 845 * list of values for that key in `this` as well as `other`. 846 */ 847 def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { 848 cogroup(other, defaultPartitioner(self, other)) 849 } 850 851 /** 852 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a 853 * tuple with the list of values for that key in `this`, `other1` and `other2`. 854 */ 855 def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) 856 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { 857 cogroup(other1, other2, defaultPartitioner(self, other1, other2)) 858 } 859 860 /** 861 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the 862 * list of values for that key in `this` as well as `other`. 863 */ 864 def cogroup[W]( 865 other: RDD[(K, W)], 866 numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { 867 cogroup(other, new HashPartitioner(numPartitions)) 868 } 869 870 /** 871 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a 872 * tuple with the list of values for that key in `this`, `other1` and `other2`. 873 */ 874 def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) 875 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { 876 cogroup(other1, other2, new HashPartitioner(numPartitions)) 877 } 878 879 /** 880 * For each key k in `this` or `other1` or `other2` or `other3`, 881 * return a resulting RDD that contains a tuple with the list of values 882 * for that key in `this`, `other1`, `other2` and `other3`. 883 */ 884 def cogroup[W1, W2, W3](other1: RDD[(K, W1)], 885 other2: RDD[(K, W2)], 886 other3: RDD[(K, W3)], 887 numPartitions: Int) 888 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { 889 cogroup(other1, other2, other3, new HashPartitioner(numPartitions)) 890 } 891 892 /** Alias for cogroup. */ 893 def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { 894 cogroup(other, defaultPartitioner(self, other)) 895 } 896 897 /** Alias for cogroup. */ 898 def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) 899 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { 900 cogroup(other1, other2, defaultPartitioner(self, other1, other2)) 901 } 902 903 /** Alias for cogroup. */ 904 def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) 905 : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { 906 cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) 907 } 908 909 /** 910 * Return an RDD with the pairs from `this` whose keys are not in `other`. 911 * 912 * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting 913 * RDD will be less than or equal to us. 914 */ 915 def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope { 916 subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length))) 917 } 918 919 /** 920 * Return an RDD with the pairs from `this` whose keys are not in `other`. 921 */ 922 def subtractByKey[W: ClassTag]( 923 other: RDD[(K, W)], 924 numPartitions: Int): RDD[(K, V)] = self.withScope { 925 subtractByKey(other, new HashPartitioner(numPartitions)) 926 } 927 928 /** 929 * Return an RDD with the pairs from `this` whose keys are not in `other`. 930 */ 931 def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope { 932 new SubtractedRDD[K, V, W](self, other, p) 933 } 934 935 /** 936 * Return the list of values in the RDD for key `key`. This operation is done efficiently if the 937 * RDD has a known partitioner by only searching the partition that the key maps to. 938 */ 939 def lookup(key: K): Seq[V] = self.withScope { 940 self.partitioner match { 941 case Some(p) => 942 val index = p.getPartition(key) 943 val process = (it: Iterator[(K, V)]) => { 944 val buf = new ArrayBuffer[V] 945 for (pair <- it if pair._1 == key) { 946 buf += pair._2 947 } 948 buf 949 } : Seq[V] 950 val res = self.context.runJob(self, process, Array(index)) 951 res(0) 952 case None => 953 self.filter(_._1 == key).map(_._2).collect() 954 } 955 } 956 957 /** 958 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class 959 * supporting the key and value types K and V in this RDD. 960 */ 961 def saveAsHadoopFile[F <: OutputFormat[K, V]]( 962 path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { 963 saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) 964 } 965 966 /** 967 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class 968 * supporting the key and value types K and V in this RDD. Compress the result with the 969 * supplied codec. 970 */ 971 def saveAsHadoopFile[F <: OutputFormat[K, V]]( 972 path: String, 973 codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope { 974 val runtimeClass = fm.runtimeClass 975 saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) 976 } 977 978 /** 979 * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` 980 * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. 981 */ 982 def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]]( 983 path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { 984 saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) 985 } 986 987 /** 988 * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` 989 * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. 990 */ 991 def saveAsNewAPIHadoopFile( 992 path: String, 993 keyClass: Class[_], 994 valueClass: Class[_], 995 outputFormatClass: Class[_ <: NewOutputFormat[_, _]], 996 conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope { 997 // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). 998 val hadoopConf = conf 999 val job = NewAPIHadoopJob.getInstance(hadoopConf) 1000 job.setOutputKeyClass(keyClass) 1001 job.setOutputValueClass(valueClass) 1002 job.setOutputFormatClass(outputFormatClass) 1003 val jobConfiguration = job.getConfiguration 1004 jobConfiguration.set("mapred.output.dir", path) 1005 saveAsNewAPIHadoopDataset(jobConfiguration) 1006 } 1007 1008 /** 1009 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class 1010 * supporting the key and value types K and V in this RDD. Compress with the supplied codec. 1011 */ 1012 def saveAsHadoopFile( 1013 path: String, 1014 keyClass: Class[_], 1015 valueClass: Class[_], 1016 outputFormatClass: Class[_ <: OutputFormat[_, _]], 1017 codec: Class[_ <: CompressionCodec]): Unit = self.withScope { 1018 saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, 1019 new JobConf(self.context.hadoopConfiguration), Some(codec)) 1020 } 1021 1022 /** 1023 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class 1024 * supporting the key and value types K and V in this RDD. 1025 * 1026 * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do 1027 * not use output committer that writes data directly. 1028 * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad 1029 * result of using direct output committer with speculation enabled. 1030 */ 1031 def saveAsHadoopFile( 1032 path: String, 1033 keyClass: Class[_], 1034 valueClass: Class[_], 1035 outputFormatClass: Class[_ <: OutputFormat[_, _]], 1036 conf: JobConf = new JobConf(self.context.hadoopConfiguration), 1037 codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { 1038 // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). 1039 val hadoopConf = conf 1040 hadoopConf.setOutputKeyClass(keyClass) 1041 hadoopConf.setOutputValueClass(valueClass) 1042 conf.setOutputFormat(outputFormatClass) 1043 for (c <- codec) { 1044 hadoopConf.setCompressMapOutput(true) 1045 hadoopConf.set("mapred.output.compress", "true") 1046 hadoopConf.setMapOutputCompressorClass(c) 1047 hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) 1048 hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) 1049 } 1050 1051 // Use configured output committer if already set 1052 if (conf.getOutputCommitter == null) { 1053 hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) 1054 } 1055 1056 // When speculation is on and output committer class name contains "Direct", we should warn 1057 // users that they may loss data if they are using a direct output committer. 1058 val speculationEnabled = self.conf.getBoolean("spark.speculation", false) 1059 val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") 1060 if (speculationEnabled && outputCommitterClass.contains("Direct")) { 1061 val warningMessage = 1062 s"$outputCommitterClass may be an output committer that writes data directly to " + 1063 "the final location. Because speculation is enabled, this output committer may " + 1064 "cause data loss (see the case in SPARK-10063). If possible, please use an output " + 1065 "committer that does not have this behavior (e.g. FileOutputCommitter)." 1066 logWarning(warningMessage) 1067 } 1068 1069 FileOutputFormat.setOutputPath(hadoopConf, 1070 SparkHadoopWriter.createPathFromString(path, hadoopConf)) 1071 saveAsHadoopDataset(hadoopConf) 1072 } 1073 1074 /** 1075 * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop 1076 * Configuration object for that storage system. The Conf should set an OutputFormat and any 1077 * output paths required (e.g. a table name to write to) in the same way as it would be 1078 * configured for a Hadoop MapReduce job. 1079 * 1080 * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do 1081 * not use output committer that writes data directly. 1082 * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad 1083 * result of using direct output committer with speculation enabled. 1084 */ 1085 def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { 1086 // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). 1087 val hadoopConf = conf 1088 val job = NewAPIHadoopJob.getInstance(hadoopConf) 1089 val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) 1090 val jobtrackerID = formatter.format(new Date()) 1091 val stageId = self.id 1092 val jobConfiguration = job.getConfiguration 1093 val wrappedConf = new SerializableConfiguration(jobConfiguration) 1094 val outfmt = job.getOutputFormatClass 1095 val jobFormat = outfmt.newInstance 1096 1097 if (isOutputSpecValidationEnabled) { 1098 // FileOutputFormat ignores the filesystem parameter 1099 jobFormat.checkOutputSpecs(job) 1100 } 1101 1102 val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => { 1103 val config = wrappedConf.value 1104 /* "reduce task" <split #> <attempt # = spark task #> */ 1105 val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, 1106 context.attemptNumber) 1107 val hadoopContext = new TaskAttemptContextImpl(config, attemptId) 1108 val format = outfmt.newInstance 1109 format match { 1110 case c: Configurable => c.setConf(config) 1111 case _ => () 1112 } 1113 val committer = format.getOutputCommitter(hadoopContext) 1114 committer.setupTask(hadoopContext) 1115 1116 val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = 1117 initHadoopOutputMetrics(context) 1118 1119 val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]] 1120 require(writer != null, "Unable to obtain RecordWriter") 1121 var recordsWritten = 0L 1122 Utils.tryWithSafeFinallyAndFailureCallbacks { 1123 while (iter.hasNext) { 1124 val pair = iter.next() 1125 writer.write(pair._1, pair._2) 1126 1127 // Update bytes written metric every few records 1128 maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) 1129 recordsWritten += 1 1130 } 1131 }(finallyBlock = writer.close(hadoopContext)) 1132 committer.commitTask(hadoopContext) 1133 outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => 1134 om.setBytesWritten(callback()) 1135 om.setRecordsWritten(recordsWritten) 1136 } 1137 1 1138 } : Int 1139 1140 val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0) 1141 val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId) 1142 val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) 1143 1144 // When speculation is on and output committer class name contains "Direct", we should warn 1145 // users that they may loss data if they are using a direct output committer. 1146 val speculationEnabled = self.conf.getBoolean("spark.speculation", false) 1147 val outputCommitterClass = jobCommitter.getClass.getSimpleName 1148 if (speculationEnabled && outputCommitterClass.contains("Direct")) { 1149 val warningMessage = 1150 s"$outputCommitterClass may be an output committer that writes data directly to " + 1151 "the final location. Because speculation is enabled, this output committer may " + 1152 "cause data loss (see the case in SPARK-10063). If possible, please use an output " + 1153 "committer that does not have this behavior (e.g. FileOutputCommitter)." 1154 logWarning(warningMessage) 1155 } 1156 1157 jobCommitter.setupJob(jobTaskContext) 1158 self.context.runJob(self, writeShard) 1159 jobCommitter.commitJob(jobTaskContext) 1160 } 1161 1162 /** 1163 * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for 1164 * that storage system. The JobConf should set an OutputFormat and any output paths required 1165 * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop 1166 * MapReduce job. 1167 */ 1168 def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { 1169 // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). 1170 val hadoopConf = conf 1171 val outputFormatInstance = hadoopConf.getOutputFormat 1172 val keyClass = hadoopConf.getOutputKeyClass 1173 val valueClass = hadoopConf.getOutputValueClass 1174 if (outputFormatInstance == null) { 1175 throw new SparkException("Output format class not set") 1176 } 1177 if (keyClass == null) { 1178 throw new SparkException("Output key class not set") 1179 } 1180 if (valueClass == null) { 1181 throw new SparkException("Output value class not set") 1182 } 1183 SparkHadoopUtil.get.addCredentials(hadoopConf) 1184 1185 logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + 1186 valueClass.getSimpleName + ")") 1187 1188 if (isOutputSpecValidationEnabled) { 1189 // FileOutputFormat ignores the filesystem parameter 1190 val ignoredFs = FileSystem.get(hadoopConf) 1191 hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) 1192 } 1193 1194 val writer = new SparkHadoopWriter(hadoopConf) 1195 writer.preSetup() 1196 1197 val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { 1198 // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it 1199 // around by taking a mod. We expect that no task will be attempted 2 billion times. 1200 val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt 1201 1202 val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = 1203 initHadoopOutputMetrics(context) 1204 1205 writer.setup(context.stageId, context.partitionId, taskAttemptId) 1206 writer.open() 1207 var recordsWritten = 0L 1208 1209 Utils.tryWithSafeFinallyAndFailureCallbacks { 1210 while (iter.hasNext) { 1211 val record = iter.next() 1212 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) 1213 1214 // Update bytes written metric every few records 1215 maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) 1216 recordsWritten += 1 1217 } 1218 }(finallyBlock = writer.close()) 1219 writer.commit() 1220 outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => 1221 om.setBytesWritten(callback()) 1222 om.setRecordsWritten(recordsWritten) 1223 } 1224 } 1225 1226 self.context.runJob(self, writeToFile) 1227 writer.commitJob() 1228 } 1229 1230 // TODO: these don't seem like the right abstractions. 1231 // We should abstract the duplicate code in a less awkward way. 1232 1233 // return type: (output metrics, bytes written callback), defined only if the latter is defined 1234 private def initHadoopOutputMetrics( 1235 context: TaskContext): Option[(OutputMetrics, () => Long)] = { 1236 val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() 1237 bytesWrittenCallback.map { b => 1238 (context.taskMetrics().outputMetrics, b) 1239 } 1240 } 1241 1242 private def maybeUpdateOutputMetrics( 1243 outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], 1244 recordsWritten: Long): Unit = { 1245 if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { 1246 outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => 1247 om.setBytesWritten(callback()) 1248 om.setRecordsWritten(recordsWritten) 1249 } 1250 } 1251 } 1252 1253 /** 1254 * Return an RDD with the keys of each tuple. 1255 */ 1256 def keys: RDD[K] = self.map(_._1) 1257 1258 /** 1259 * Return an RDD with the values of each tuple. 1260 */ 1261 def values: RDD[V] = self.map(_._2) 1262 1263 private[spark] def keyClass: Class[_] = kt.runtimeClass 1264 1265 private[spark] def valueClass: Class[_] = vt.runtimeClass 1266 1267 private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) 1268 1269 // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation 1270 // setting can take effect: 1271 private def isOutputSpecValidationEnabled: Boolean = { 1272 val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value 1273 val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) 1274 enabledInConf && !validationDisabled 1275 } 1276} 1277 1278private[spark] object PairRDDFunctions { 1279 val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 1280 1281 /** 1282 * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case 1283 * basis; see SPARK-4835 for more details. 1284 */ 1285 val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) 1286} 1287