1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.rdd
19
20import java.nio.ByteBuffer
21import java.text.SimpleDateFormat
22import java.util.{Date, HashMap => JHashMap, Locale}
23
24import scala.collection.{mutable, Map}
25import scala.collection.JavaConverters._
26import scala.collection.mutable.ArrayBuffer
27import scala.reflect.ClassTag
28import scala.util.DynamicVariable
29
30import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
31import org.apache.hadoop.conf.{Configurable, Configuration}
32import org.apache.hadoop.fs.FileSystem
33import org.apache.hadoop.io.SequenceFile.CompressionType
34import org.apache.hadoop.io.compress.CompressionCodec
35import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
36import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptID, TaskType}
37import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
38
39import org.apache.spark._
40import org.apache.spark.Partitioner.defaultPartitioner
41import org.apache.spark.annotation.Experimental
42import org.apache.spark.deploy.SparkHadoopUtil
43import org.apache.spark.executor.OutputMetrics
44import org.apache.spark.internal.Logging
45import org.apache.spark.partial.{BoundedDouble, PartialResult}
46import org.apache.spark.serializer.Serializer
47import org.apache.spark.util.{SerializableConfiguration, Utils}
48import org.apache.spark.util.collection.CompactBuffer
49import org.apache.spark.util.random.StratifiedSamplingUtils
50
51/**
52 * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
53 */
54class PairRDDFunctions[K, V](self: RDD[(K, V)])
55    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
56  extends Logging with Serializable {
57
58  /**
59   * :: Experimental ::
60   * Generic function to combine the elements for each key using a custom set of aggregation
61   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
62   *
63   * Users provide three functions:
64   *
65   *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
66   *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
67   *  - `mergeCombiners`, to combine two C's into a single one.
68   *
69   * In addition, users can control the partitioning of the output RDD, and whether to perform
70   * map-side aggregation (if a mapper can produce multiple items with the same key).
71   *
72   * @note V and C can be different -- for example, one might group an RDD of type
73   * (Int, Int) into an RDD of type (Int, Seq[Int]).
74   */
75  @Experimental
76  def combineByKeyWithClassTag[C](
77      createCombiner: V => C,
78      mergeValue: (C, V) => C,
79      mergeCombiners: (C, C) => C,
80      partitioner: Partitioner,
81      mapSideCombine: Boolean = true,
82      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
83    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
84    if (keyClass.isArray) {
85      if (mapSideCombine) {
86        throw new SparkException("Cannot use map-side combining with array keys.")
87      }
88      if (partitioner.isInstanceOf[HashPartitioner]) {
89        throw new SparkException("HashPartitioner cannot partition array keys.")
90      }
91    }
92    val aggregator = new Aggregator[K, V, C](
93      self.context.clean(createCombiner),
94      self.context.clean(mergeValue),
95      self.context.clean(mergeCombiners))
96    if (self.partitioner == Some(partitioner)) {
97      self.mapPartitions(iter => {
98        val context = TaskContext.get()
99        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
100      }, preservesPartitioning = true)
101    } else {
102      new ShuffledRDD[K, V, C](self, partitioner)
103        .setSerializer(serializer)
104        .setAggregator(aggregator)
105        .setMapSideCombine(mapSideCombine)
106    }
107  }
108
109  /**
110   * Generic function to combine the elements for each key using a custom set of aggregation
111   * functions. This method is here for backward compatibility. It does not provide combiner
112   * classtag information to the shuffle.
113   *
114   * @see [[combineByKeyWithClassTag]]
115   */
116  def combineByKey[C](
117      createCombiner: V => C,
118      mergeValue: (C, V) => C,
119      mergeCombiners: (C, C) => C,
120      partitioner: Partitioner,
121      mapSideCombine: Boolean = true,
122      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
123    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
124      partitioner, mapSideCombine, serializer)(null)
125  }
126
127  /**
128   * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
129   * This method is here for backward compatibility. It does not provide combiner
130   * classtag information to the shuffle.
131   *
132   * @see [[combineByKeyWithClassTag]]
133   */
134  def combineByKey[C](
135      createCombiner: V => C,
136      mergeValue: (C, V) => C,
137      mergeCombiners: (C, C) => C,
138      numPartitions: Int): RDD[(K, C)] = self.withScope {
139    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
140  }
141
142  /**
143   * :: Experimental ::
144   * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
145   */
146  @Experimental
147  def combineByKeyWithClassTag[C](
148      createCombiner: V => C,
149      mergeValue: (C, V) => C,
150      mergeCombiners: (C, C) => C,
151      numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
152    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
153      new HashPartitioner(numPartitions))
154  }
155
156  /**
157   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
158   * This function can return a different result type, U, than the type of the values in this RDD,
159   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
160   * as in scala.TraversableOnce. The former operation is used for merging values within a
161   * partition, and the latter is used for merging values between partitions. To avoid memory
162   * allocation, both of these functions are allowed to modify and return their first argument
163   * instead of creating a new U.
164   */
165  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
166      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
167    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
168    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
169    val zeroArray = new Array[Byte](zeroBuffer.limit)
170    zeroBuffer.get(zeroArray)
171
172    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
173    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
174
175    // We will clean the combiner closure later in `combineByKey`
176    val cleanedSeqOp = self.context.clean(seqOp)
177    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
178      cleanedSeqOp, combOp, partitioner)
179  }
180
181  /**
182   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
183   * This function can return a different result type, U, than the type of the values in this RDD,
184   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
185   * as in scala.TraversableOnce. The former operation is used for merging values within a
186   * partition, and the latter is used for merging values between partitions. To avoid memory
187   * allocation, both of these functions are allowed to modify and return their first argument
188   * instead of creating a new U.
189   */
190  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
191      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
192    aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
193  }
194
195  /**
196   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
197   * This function can return a different result type, U, than the type of the values in this RDD,
198   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
199   * as in scala.TraversableOnce. The former operation is used for merging values within a
200   * partition, and the latter is used for merging values between partitions. To avoid memory
201   * allocation, both of these functions are allowed to modify and return their first argument
202   * instead of creating a new U.
203   */
204  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
205      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
206    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
207  }
208
209  /**
210   * Merge the values for each key using an associative function and a neutral "zero value" which
211   * may be added to the result an arbitrary number of times, and must not change the result
212   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
213   */
214  def foldByKey(
215      zeroValue: V,
216      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
217    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
218    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
219    val zeroArray = new Array[Byte](zeroBuffer.limit)
220    zeroBuffer.get(zeroArray)
221
222    // When deserializing, use a lazy val to create just one instance of the serializer per task
223    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
224    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
225
226    val cleanedFunc = self.context.clean(func)
227    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
228      cleanedFunc, cleanedFunc, partitioner)
229  }
230
231  /**
232   * Merge the values for each key using an associative function and a neutral "zero value" which
233   * may be added to the result an arbitrary number of times, and must not change the result
234   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
235   */
236  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
237    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
238  }
239
240  /**
241   * Merge the values for each key using an associative function and a neutral "zero value" which
242   * may be added to the result an arbitrary number of times, and must not change the result
243   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
244   */
245  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
246    foldByKey(zeroValue, defaultPartitioner(self))(func)
247  }
248
249  /**
250   * Return a subset of this RDD sampled by key (via stratified sampling).
251   *
252   * Create a sample of this RDD using variable sampling rates for different keys as specified by
253   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
254   * RDD, to produce a sample of size that's approximately equal to the sum of
255   * math.ceil(numItems * samplingRate) over all key values.
256   *
257   * @param withReplacement whether to sample with or without replacement
258   * @param fractions map of specific keys to sampling rates
259   * @param seed seed for the random number generator
260   * @return RDD containing the sampled subset
261   */
262  def sampleByKey(withReplacement: Boolean,
263      fractions: Map[K, Double],
264      seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
265
266    require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
267
268    val samplingFunc = if (withReplacement) {
269      StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
270    } else {
271      StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
272    }
273    self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
274  }
275
276  /**
277   * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
278   * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
279   *
280   * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
281   * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
282   * over all key values with a 99.99% confidence. When sampling without replacement, we need one
283   * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
284   * two additional passes.
285   *
286   * @param withReplacement whether to sample with or without replacement
287   * @param fractions map of specific keys to sampling rates
288   * @param seed seed for the random number generator
289   * @return RDD containing the sampled subset
290   */
291  def sampleByKeyExact(
292      withReplacement: Boolean,
293      fractions: Map[K, Double],
294      seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
295
296    require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
297
298    val samplingFunc = if (withReplacement) {
299      StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, true, seed)
300    } else {
301      StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
302    }
303    self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
304  }
305
306  /**
307   * Merge the values for each key using an associative and commutative reduce function. This will
308   * also perform the merging locally on each mapper before sending results to a reducer, similarly
309   * to a "combiner" in MapReduce.
310   */
311  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
312    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
313  }
314
315  /**
316   * Merge the values for each key using an associative and commutative reduce function. This will
317   * also perform the merging locally on each mapper before sending results to a reducer, similarly
318   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
319   */
320  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
321    reduceByKey(new HashPartitioner(numPartitions), func)
322  }
323
324  /**
325   * Merge the values for each key using an associative and commutative reduce function. This will
326   * also perform the merging locally on each mapper before sending results to a reducer, similarly
327   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
328   * parallelism level.
329   */
330  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
331    reduceByKey(defaultPartitioner(self), func)
332  }
333
334  /**
335   * Merge the values for each key using an associative and commutative reduce function, but return
336   * the results immediately to the master as a Map. This will also perform the merging locally on
337   * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
338   */
339  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
340    val cleanedF = self.sparkContext.clean(func)
341
342    if (keyClass.isArray) {
343      throw new SparkException("reduceByKeyLocally() does not support array keys")
344    }
345
346    val reducePartition = (iter: Iterator[(K, V)]) => {
347      val map = new JHashMap[K, V]
348      iter.foreach { pair =>
349        val old = map.get(pair._1)
350        map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
351      }
352      Iterator(map)
353    } : Iterator[JHashMap[K, V]]
354
355    val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
356      m2.asScala.foreach { pair =>
357        val old = m1.get(pair._1)
358        m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
359      }
360      m1
361    } : JHashMap[K, V]
362
363    self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
364  }
365
366  /**
367   * Count the number of elements for each key, collecting the results to a local Map.
368   *
369   * @note This method should only be used if the resulting map is expected to be small, as
370   * the whole thing is loaded into the driver's memory.
371   * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
372   * returns an RDD[T, Long] instead of a map.
373   */
374  def countByKey(): Map[K, Long] = self.withScope {
375    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
376  }
377
378  /**
379   * Approximate version of countByKey that can return a partial result if it does
380   * not finish within a timeout.
381   *
382   * The confidence is the probability that the error bounds of the result will
383   * contain the true value. That is, if countApprox were called repeatedly
384   * with confidence 0.9, we would expect 90% of the results to contain the
385   * true count. The confidence must be in the range [0,1] or an exception will
386   * be thrown.
387   *
388   * @param timeout maximum time to wait for the job, in milliseconds
389   * @param confidence the desired statistical confidence in the result
390   * @return a potentially incomplete result, with error bounds
391   */
392  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
393      : PartialResult[Map[K, BoundedDouble]] = self.withScope {
394    self.map(_._1).countByValueApprox(timeout, confidence)
395  }
396
397  /**
398   * Return approximate number of distinct values for each key in this RDD.
399   *
400   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
401   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
402   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
403   *
404   * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero (`sp` is
405   * greater than `p`) would trigger sparse representation of registers, which may reduce the
406   * memory consumption and increase accuracy when the cardinality is small.
407   *
408   * @param p The precision value for the normal set.
409   *          `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
410   * @param sp The precision value for the sparse set, between 0 and 32.
411   *           If `sp` equals 0, the sparse representation is skipped.
412   * @param partitioner Partitioner to use for the resulting RDD.
413   */
414  def countApproxDistinctByKey(
415      p: Int,
416      sp: Int,
417      partitioner: Partitioner): RDD[(K, Long)] = self.withScope {
418    require(p >= 4, s"p ($p) must be >= 4")
419    require(sp <= 32, s"sp ($sp) must be <= 32")
420    require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
421    val createHLL = (v: V) => {
422      val hll = new HyperLogLogPlus(p, sp)
423      hll.offer(v)
424      hll
425    }
426    val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
427      hll.offer(v)
428      hll
429    }
430    val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
431      h1.addAll(h2)
432      h1
433    }
434
435    combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner)
436      .mapValues(_.cardinality())
437  }
438
439  /**
440   * Return approximate number of distinct values for each key in this RDD.
441   *
442   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
443   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
444   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
445   *
446   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
447   *                   It must be greater than 0.000017.
448   * @param partitioner partitioner of the resulting RDD
449   */
450  def countApproxDistinctByKey(
451      relativeSD: Double,
452      partitioner: Partitioner): RDD[(K, Long)] = self.withScope {
453    require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
454    val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
455    assert(p <= 32)
456    countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
457  }
458
459  /**
460   * Return approximate number of distinct values for each key in this RDD.
461   *
462   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
463   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
464   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
465   *
466   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
467   *                   It must be greater than 0.000017.
468   * @param numPartitions number of partitions of the resulting RDD
469   */
470  def countApproxDistinctByKey(
471      relativeSD: Double,
472      numPartitions: Int): RDD[(K, Long)] = self.withScope {
473    countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
474  }
475
476  /**
477   * Return approximate number of distinct values for each key in this RDD.
478   *
479   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
480   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
481   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
482   *
483   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
484   *                   It must be greater than 0.000017.
485   */
486  def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = self.withScope {
487    countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
488  }
489
490  /**
491   * Group the values for each key in the RDD into a single sequence. Allows controlling the
492   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
493   * The ordering of elements within each group is not guaranteed, and may even differ
494   * each time the resulting RDD is evaluated.
495   *
496   * @note This operation may be very expensive. If you are grouping in order to perform an
497   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
498   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
499   *
500   * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
501   * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
502   */
503  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
504    // groupByKey shouldn't use map side combine because map side combine does not
505    // reduce the amount of data shuffled and requires all map side data be inserted
506    // into a hash table, leading to more objects in the old gen.
507    val createCombiner = (v: V) => CompactBuffer(v)
508    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
509    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
510    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
511      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
512    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
513  }
514
515  /**
516   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
517   * resulting RDD with into `numPartitions` partitions. The ordering of elements within
518   * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
519   *
520   * @note This operation may be very expensive. If you are grouping in order to perform an
521   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
522   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
523   *
524   * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
525   * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
526   */
527  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
528    groupByKey(new HashPartitioner(numPartitions))
529  }
530
531  /**
532   * Return a copy of the RDD partitioned using the specified partitioner.
533   */
534  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
535    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
536      throw new SparkException("HashPartitioner cannot partition array keys.")
537    }
538    if (self.partitioner == Some(partitioner)) {
539      self
540    } else {
541      new ShuffledRDD[K, V, V](self, partitioner)
542    }
543  }
544
545  /**
546   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
547   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
548   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
549   */
550  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
551    this.cogroup(other, partitioner).flatMapValues( pair =>
552      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
553    )
554  }
555
556  /**
557   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
558   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
559   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
560   * partition the output RDD.
561   */
562  def leftOuterJoin[W](
563      other: RDD[(K, W)],
564      partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
565    this.cogroup(other, partitioner).flatMapValues { pair =>
566      if (pair._2.isEmpty) {
567        pair._1.iterator.map(v => (v, None))
568      } else {
569        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
570      }
571    }
572  }
573
574  /**
575   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
576   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
577   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
578   * partition the output RDD.
579   */
580  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
581      : RDD[(K, (Option[V], W))] = self.withScope {
582    this.cogroup(other, partitioner).flatMapValues { pair =>
583      if (pair._1.isEmpty) {
584        pair._2.iterator.map(w => (None, w))
585      } else {
586        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
587      }
588    }
589  }
590
591  /**
592   * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
593   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
594   * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
595   * element (k, w) in `other`, the resulting RDD will either contain all pairs
596   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
597   * in `this` have key k. Uses the given Partitioner to partition the output RDD.
598   */
599  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
600      : RDD[(K, (Option[V], Option[W]))] = self.withScope {
601    this.cogroup(other, partitioner).flatMapValues {
602      case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
603      case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
604      case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
605    }
606  }
607
608  /**
609   * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
610   * existing partitioner/parallelism level. This method is here for backward compatibility. It
611   * does not provide combiner classtag information to the shuffle.
612   *
613   * @see [[combineByKeyWithClassTag]]
614   */
615  def combineByKey[C](
616      createCombiner: V => C,
617      mergeValue: (C, V) => C,
618      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
619    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
620  }
621
622  /**
623   * :: Experimental ::
624   * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
625   * existing partitioner/parallelism level.
626   */
627  @Experimental
628  def combineByKeyWithClassTag[C](
629      createCombiner: V => C,
630      mergeValue: (C, V) => C,
631      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
632    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
633  }
634
635  /**
636   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
637   * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
638   * within each group is not guaranteed, and may even differ each time the resulting RDD is
639   * evaluated.
640   *
641   * @note This operation may be very expensive. If you are grouping in order to perform an
642   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
643   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
644   */
645  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
646    groupByKey(defaultPartitioner(self))
647  }
648
649  /**
650   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
651   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
652   * (k, v2) is in `other`. Performs a hash join across the cluster.
653   */
654  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
655    join(other, defaultPartitioner(self, other))
656  }
657
658  /**
659   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
660   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
661   * (k, v2) is in `other`. Performs a hash join across the cluster.
662   */
663  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
664    join(other, new HashPartitioner(numPartitions))
665  }
666
667  /**
668   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
669   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
670   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
671   * using the existing partitioner/parallelism level.
672   */
673  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope {
674    leftOuterJoin(other, defaultPartitioner(self, other))
675  }
676
677  /**
678   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
679   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
680   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
681   * into `numPartitions` partitions.
682   */
683  def leftOuterJoin[W](
684      other: RDD[(K, W)],
685      numPartitions: Int): RDD[(K, (V, Option[W]))] = self.withScope {
686    leftOuterJoin(other, new HashPartitioner(numPartitions))
687  }
688
689  /**
690   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
691   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
692   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
693   * RDD using the existing partitioner/parallelism level.
694   */
695  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope {
696    rightOuterJoin(other, defaultPartitioner(self, other))
697  }
698
699  /**
700   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
701   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
702   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
703   * RDD into the given number of partitions.
704   */
705  def rightOuterJoin[W](
706      other: RDD[(K, W)],
707      numPartitions: Int): RDD[(K, (Option[V], W))] = self.withScope {
708    rightOuterJoin(other, new HashPartitioner(numPartitions))
709  }
710
711  /**
712   * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
713   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
714   * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
715   * element (k, w) in `other`, the resulting RDD will either contain all pairs
716   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
717   * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
718   * parallelism level.
719   */
720  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope {
721    fullOuterJoin(other, defaultPartitioner(self, other))
722  }
723
724  /**
725   * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
726   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
727   * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
728   * element (k, w) in `other`, the resulting RDD will either contain all pairs
729   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
730   * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
731   */
732  def fullOuterJoin[W](
733      other: RDD[(K, W)],
734      numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope {
735    fullOuterJoin(other, new HashPartitioner(numPartitions))
736  }
737
738  /**
739   * Return the key-value pairs in this RDD to the master as a Map.
740   *
741   * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
742   *          one value per key is preserved in the map returned)
743   *
744   * @note this method should only be used if the resulting data is expected to be small, as
745   * all the data is loaded into the driver's memory.
746   */
747  def collectAsMap(): Map[K, V] = self.withScope {
748    val data = self.collect()
749    val map = new mutable.HashMap[K, V]
750    map.sizeHint(data.length)
751    data.foreach { pair => map.put(pair._1, pair._2) }
752    map
753  }
754
755  /**
756   * Pass each value in the key-value pair RDD through a map function without changing the keys;
757   * this also retains the original RDD's partitioning.
758   */
759  def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
760    val cleanF = self.context.clean(f)
761    new MapPartitionsRDD[(K, U), (K, V)](self,
762      (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
763      preservesPartitioning = true)
764  }
765
766  /**
767   * Pass each value in the key-value pair RDD through a flatMap function without changing the
768   * keys; this also retains the original RDD's partitioning.
769   */
770  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
771    val cleanF = self.context.clean(f)
772    new MapPartitionsRDD[(K, U), (K, V)](self,
773      (context, pid, iter) => iter.flatMap { case (k, v) =>
774        cleanF(v).map(x => (k, x))
775      },
776      preservesPartitioning = true)
777  }
778
779  /**
780   * For each key k in `this` or `other1` or `other2` or `other3`,
781   * return a resulting RDD that contains a tuple with the list of values
782   * for that key in `this`, `other1`, `other2` and `other3`.
783   */
784  def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
785      other2: RDD[(K, W2)],
786      other3: RDD[(K, W3)],
787      partitioner: Partitioner)
788      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
789    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
790      throw new SparkException("HashPartitioner cannot partition array keys.")
791    }
792    val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
793    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
794       (vs.asInstanceOf[Iterable[V]],
795         w1s.asInstanceOf[Iterable[W1]],
796         w2s.asInstanceOf[Iterable[W2]],
797         w3s.asInstanceOf[Iterable[W3]])
798    }
799  }
800
801  /**
802   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
803   * list of values for that key in `this` as well as `other`.
804   */
805  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
806      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
807    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
808      throw new SparkException("HashPartitioner cannot partition array keys.")
809    }
810    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
811    cg.mapValues { case Array(vs, w1s) =>
812      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
813    }
814  }
815
816  /**
817   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
818   * tuple with the list of values for that key in `this`, `other1` and `other2`.
819   */
820  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
821      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
822    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
823      throw new SparkException("HashPartitioner cannot partition array keys.")
824    }
825    val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
826    cg.mapValues { case Array(vs, w1s, w2s) =>
827      (vs.asInstanceOf[Iterable[V]],
828        w1s.asInstanceOf[Iterable[W1]],
829        w2s.asInstanceOf[Iterable[W2]])
830    }
831  }
832
833  /**
834   * For each key k in `this` or `other1` or `other2` or `other3`,
835   * return a resulting RDD that contains a tuple with the list of values
836   * for that key in `this`, `other1`, `other2` and `other3`.
837   */
838  def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
839      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
840    cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
841  }
842
843  /**
844   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
845   * list of values for that key in `this` as well as `other`.
846   */
847  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
848    cogroup(other, defaultPartitioner(self, other))
849  }
850
851  /**
852   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
853   * tuple with the list of values for that key in `this`, `other1` and `other2`.
854   */
855  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
856      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
857    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
858  }
859
860  /**
861   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
862   * list of values for that key in `this` as well as `other`.
863   */
864  def cogroup[W](
865      other: RDD[(K, W)],
866      numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
867    cogroup(other, new HashPartitioner(numPartitions))
868  }
869
870  /**
871   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
872   * tuple with the list of values for that key in `this`, `other1` and `other2`.
873   */
874  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
875      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
876    cogroup(other1, other2, new HashPartitioner(numPartitions))
877  }
878
879  /**
880   * For each key k in `this` or `other1` or `other2` or `other3`,
881   * return a resulting RDD that contains a tuple with the list of values
882   * for that key in `this`, `other1`, `other2` and `other3`.
883   */
884  def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
885      other2: RDD[(K, W2)],
886      other3: RDD[(K, W3)],
887      numPartitions: Int)
888      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
889    cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
890  }
891
892  /** Alias for cogroup. */
893  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
894    cogroup(other, defaultPartitioner(self, other))
895  }
896
897  /** Alias for cogroup. */
898  def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
899      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
900    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
901  }
902
903  /** Alias for cogroup. */
904  def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
905      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
906    cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
907  }
908
909  /**
910   * Return an RDD with the pairs from `this` whose keys are not in `other`.
911   *
912   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
913   * RDD will be less than or equal to us.
914   */
915  def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope {
916    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))
917  }
918
919  /**
920   * Return an RDD with the pairs from `this` whose keys are not in `other`.
921   */
922  def subtractByKey[W: ClassTag](
923      other: RDD[(K, W)],
924      numPartitions: Int): RDD[(K, V)] = self.withScope {
925    subtractByKey(other, new HashPartitioner(numPartitions))
926  }
927
928  /**
929   * Return an RDD with the pairs from `this` whose keys are not in `other`.
930   */
931  def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope {
932    new SubtractedRDD[K, V, W](self, other, p)
933  }
934
935  /**
936   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
937   * RDD has a known partitioner by only searching the partition that the key maps to.
938   */
939  def lookup(key: K): Seq[V] = self.withScope {
940    self.partitioner match {
941      case Some(p) =>
942        val index = p.getPartition(key)
943        val process = (it: Iterator[(K, V)]) => {
944          val buf = new ArrayBuffer[V]
945          for (pair <- it if pair._1 == key) {
946            buf += pair._2
947          }
948          buf
949        } : Seq[V]
950        val res = self.context.runJob(self, process, Array(index))
951        res(0)
952      case None =>
953        self.filter(_._1 == key).map(_._2).collect()
954    }
955  }
956
957  /**
958   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
959   * supporting the key and value types K and V in this RDD.
960   */
961  def saveAsHadoopFile[F <: OutputFormat[K, V]](
962      path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
963    saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
964  }
965
966  /**
967   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
968   * supporting the key and value types K and V in this RDD. Compress the result with the
969   * supplied codec.
970   */
971  def saveAsHadoopFile[F <: OutputFormat[K, V]](
972      path: String,
973      codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
974    val runtimeClass = fm.runtimeClass
975    saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
976  }
977
978  /**
979   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
980   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
981   */
982  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
983      path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
984    saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
985  }
986
987  /**
988   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
989   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
990   */
991  def saveAsNewAPIHadoopFile(
992      path: String,
993      keyClass: Class[_],
994      valueClass: Class[_],
995      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
996      conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
997    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
998    val hadoopConf = conf
999    val job = NewAPIHadoopJob.getInstance(hadoopConf)
1000    job.setOutputKeyClass(keyClass)
1001    job.setOutputValueClass(valueClass)
1002    job.setOutputFormatClass(outputFormatClass)
1003    val jobConfiguration = job.getConfiguration
1004    jobConfiguration.set("mapred.output.dir", path)
1005    saveAsNewAPIHadoopDataset(jobConfiguration)
1006  }
1007
1008  /**
1009   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
1010   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
1011   */
1012  def saveAsHadoopFile(
1013      path: String,
1014      keyClass: Class[_],
1015      valueClass: Class[_],
1016      outputFormatClass: Class[_ <: OutputFormat[_, _]],
1017      codec: Class[_ <: CompressionCodec]): Unit = self.withScope {
1018    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
1019      new JobConf(self.context.hadoopConfiguration), Some(codec))
1020  }
1021
1022  /**
1023   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
1024   * supporting the key and value types K and V in this RDD.
1025   *
1026   * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do
1027   * not use output committer that writes data directly.
1028   * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
1029   * result of using direct output committer with speculation enabled.
1030   */
1031  def saveAsHadoopFile(
1032      path: String,
1033      keyClass: Class[_],
1034      valueClass: Class[_],
1035      outputFormatClass: Class[_ <: OutputFormat[_, _]],
1036      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
1037      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
1038    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
1039    val hadoopConf = conf
1040    hadoopConf.setOutputKeyClass(keyClass)
1041    hadoopConf.setOutputValueClass(valueClass)
1042    conf.setOutputFormat(outputFormatClass)
1043    for (c <- codec) {
1044      hadoopConf.setCompressMapOutput(true)
1045      hadoopConf.set("mapred.output.compress", "true")
1046      hadoopConf.setMapOutputCompressorClass(c)
1047      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
1048      hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
1049    }
1050
1051    // Use configured output committer if already set
1052    if (conf.getOutputCommitter == null) {
1053      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
1054    }
1055
1056    // When speculation is on and output committer class name contains "Direct", we should warn
1057    // users that they may loss data if they are using a direct output committer.
1058    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
1059    val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
1060    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
1061      val warningMessage =
1062        s"$outputCommitterClass may be an output committer that writes data directly to " +
1063          "the final location. Because speculation is enabled, this output committer may " +
1064          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
1065          "committer that does not have this behavior (e.g. FileOutputCommitter)."
1066      logWarning(warningMessage)
1067    }
1068
1069    FileOutputFormat.setOutputPath(hadoopConf,
1070      SparkHadoopWriter.createPathFromString(path, hadoopConf))
1071    saveAsHadoopDataset(hadoopConf)
1072  }
1073
1074  /**
1075   * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
1076   * Configuration object for that storage system. The Conf should set an OutputFormat and any
1077   * output paths required (e.g. a table name to write to) in the same way as it would be
1078   * configured for a Hadoop MapReduce job.
1079   *
1080   * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do
1081   * not use output committer that writes data directly.
1082   * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
1083   * result of using direct output committer with speculation enabled.
1084   */
1085  def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
1086    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
1087    val hadoopConf = conf
1088    val job = NewAPIHadoopJob.getInstance(hadoopConf)
1089    val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
1090    val jobtrackerID = formatter.format(new Date())
1091    val stageId = self.id
1092    val jobConfiguration = job.getConfiguration
1093    val wrappedConf = new SerializableConfiguration(jobConfiguration)
1094    val outfmt = job.getOutputFormatClass
1095    val jobFormat = outfmt.newInstance
1096
1097    if (isOutputSpecValidationEnabled) {
1098      // FileOutputFormat ignores the filesystem parameter
1099      jobFormat.checkOutputSpecs(job)
1100    }
1101
1102    val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
1103      val config = wrappedConf.value
1104      /* "reduce task" <split #> <attempt # = spark task #> */
1105      val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
1106        context.attemptNumber)
1107      val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
1108      val format = outfmt.newInstance
1109      format match {
1110        case c: Configurable => c.setConf(config)
1111        case _ => ()
1112      }
1113      val committer = format.getOutputCommitter(hadoopContext)
1114      committer.setupTask(hadoopContext)
1115
1116      val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
1117        initHadoopOutputMetrics(context)
1118
1119      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]]
1120      require(writer != null, "Unable to obtain RecordWriter")
1121      var recordsWritten = 0L
1122      Utils.tryWithSafeFinallyAndFailureCallbacks {
1123        while (iter.hasNext) {
1124          val pair = iter.next()
1125          writer.write(pair._1, pair._2)
1126
1127          // Update bytes written metric every few records
1128          maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
1129          recordsWritten += 1
1130        }
1131      }(finallyBlock = writer.close(hadoopContext))
1132      committer.commitTask(hadoopContext)
1133      outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
1134        om.setBytesWritten(callback())
1135        om.setRecordsWritten(recordsWritten)
1136      }
1137      1
1138    } : Int
1139
1140    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
1141    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
1142    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
1143
1144    // When speculation is on and output committer class name contains "Direct", we should warn
1145    // users that they may loss data if they are using a direct output committer.
1146    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
1147    val outputCommitterClass = jobCommitter.getClass.getSimpleName
1148    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
1149      val warningMessage =
1150        s"$outputCommitterClass may be an output committer that writes data directly to " +
1151          "the final location. Because speculation is enabled, this output committer may " +
1152          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
1153          "committer that does not have this behavior (e.g. FileOutputCommitter)."
1154      logWarning(warningMessage)
1155    }
1156
1157    jobCommitter.setupJob(jobTaskContext)
1158    self.context.runJob(self, writeShard)
1159    jobCommitter.commitJob(jobTaskContext)
1160  }
1161
1162  /**
1163   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
1164   * that storage system. The JobConf should set an OutputFormat and any output paths required
1165   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
1166   * MapReduce job.
1167   */
1168  def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
1169    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
1170    val hadoopConf = conf
1171    val outputFormatInstance = hadoopConf.getOutputFormat
1172    val keyClass = hadoopConf.getOutputKeyClass
1173    val valueClass = hadoopConf.getOutputValueClass
1174    if (outputFormatInstance == null) {
1175      throw new SparkException("Output format class not set")
1176    }
1177    if (keyClass == null) {
1178      throw new SparkException("Output key class not set")
1179    }
1180    if (valueClass == null) {
1181      throw new SparkException("Output value class not set")
1182    }
1183    SparkHadoopUtil.get.addCredentials(hadoopConf)
1184
1185    logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
1186      valueClass.getSimpleName + ")")
1187
1188    if (isOutputSpecValidationEnabled) {
1189      // FileOutputFormat ignores the filesystem parameter
1190      val ignoredFs = FileSystem.get(hadoopConf)
1191      hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
1192    }
1193
1194    val writer = new SparkHadoopWriter(hadoopConf)
1195    writer.preSetup()
1196
1197    val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
1198      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
1199      // around by taking a mod. We expect that no task will be attempted 2 billion times.
1200      val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
1201
1202      val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
1203        initHadoopOutputMetrics(context)
1204
1205      writer.setup(context.stageId, context.partitionId, taskAttemptId)
1206      writer.open()
1207      var recordsWritten = 0L
1208
1209      Utils.tryWithSafeFinallyAndFailureCallbacks {
1210        while (iter.hasNext) {
1211          val record = iter.next()
1212          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
1213
1214          // Update bytes written metric every few records
1215          maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
1216          recordsWritten += 1
1217        }
1218      }(finallyBlock = writer.close())
1219      writer.commit()
1220      outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
1221        om.setBytesWritten(callback())
1222        om.setRecordsWritten(recordsWritten)
1223      }
1224    }
1225
1226    self.context.runJob(self, writeToFile)
1227    writer.commitJob()
1228  }
1229
1230  // TODO: these don't seem like the right abstractions.
1231  // We should abstract the duplicate code in a less awkward way.
1232
1233  // return type: (output metrics, bytes written callback), defined only if the latter is defined
1234  private def initHadoopOutputMetrics(
1235      context: TaskContext): Option[(OutputMetrics, () => Long)] = {
1236    val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
1237    bytesWrittenCallback.map { b =>
1238      (context.taskMetrics().outputMetrics, b)
1239    }
1240  }
1241
1242  private def maybeUpdateOutputMetrics(
1243      outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
1244      recordsWritten: Long): Unit = {
1245    if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
1246      outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
1247        om.setBytesWritten(callback())
1248        om.setRecordsWritten(recordsWritten)
1249      }
1250    }
1251  }
1252
1253  /**
1254   * Return an RDD with the keys of each tuple.
1255   */
1256  def keys: RDD[K] = self.map(_._1)
1257
1258  /**
1259   * Return an RDD with the values of each tuple.
1260   */
1261  def values: RDD[V] = self.map(_._2)
1262
1263  private[spark] def keyClass: Class[_] = kt.runtimeClass
1264
1265  private[spark] def valueClass: Class[_] = vt.runtimeClass
1266
1267  private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
1268
1269  // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
1270  // setting can take effect:
1271  private def isOutputSpecValidationEnabled: Boolean = {
1272    val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
1273    val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
1274    enabledInConf && !validationDisabled
1275  }
1276}
1277
1278private[spark] object PairRDDFunctions {
1279  val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
1280
1281  /**
1282   * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
1283   * basis; see SPARK-4835 for more details.
1284   */
1285  val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
1286}
1287