1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.rdd
19
20import scala.reflect.ClassTag
21
22import org.apache.spark.{Partitioner, RangePartitioner}
23import org.apache.spark.annotation.DeveloperApi
24import org.apache.spark.internal.Logging
25
26/**
27 * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
28 * an implicit conversion. They will work with any key type `K` that has an implicit `Ordering[K]`
29 * in scope. Ordering objects already exist for all of the standard primitive types. Users can also
30 * define their own orderings for custom types, or to override the default ordering. The implicit
31 * ordering that is in the closest scope will be used.
32 *
33 * {{{
34 *   import org.apache.spark.SparkContext._
35 *
36 *   val rdd: RDD[(String, Int)] = ...
37 *   implicit val caseInsensitiveOrdering = new Ordering[String] {
38 *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
39 *   }
40 *
41 *   // Sort by key, using the above case insensitive ordering.
42 *   rdd.sortByKey()
43 * }}}
44 */
45class OrderedRDDFunctions[K : Ordering : ClassTag,
46                          V: ClassTag,
47                          P <: Product2[K, V] : ClassTag] @DeveloperApi() (
48    self: RDD[P])
49  extends Logging with Serializable {
50  private val ordering = implicitly[Ordering[K]]
51
52  /**
53   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
54   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
55   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
56   * order of the keys).
57   */
58  // TODO: this currently doesn't work on P other than Tuple2!
59  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
60      : RDD[(K, V)] = self.withScope
61  {
62    val part = new RangePartitioner(numPartitions, self, ascending)
63    new ShuffledRDD[K, V, V](self, part)
64      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
65  }
66
67  /**
68   * Repartition the RDD according to the given partitioner and, within each resulting partition,
69   * sort records by their keys.
70   *
71   * This is more efficient than calling `repartition` and then sorting within each partition
72   * because it can push the sorting down into the shuffle machinery.
73   */
74  def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
75    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
76  }
77
78  /**
79   * Returns an RDD containing only the elements in the inclusive range `lower` to `upper`.
80   * If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
81   * performed efficiently by only scanning the partitions that might contain matching elements.
82   * Otherwise, a standard `filter` is applied to all partitions.
83   */
84  def filterByRange(lower: K, upper: K): RDD[P] = self.withScope {
85
86    def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
87
88    val rddToFilter: RDD[P] = self.partitioner match {
89      case Some(rp: RangePartitioner[K, V]) =>
90        val partitionIndicies = (rp.getPartition(lower), rp.getPartition(upper)) match {
91          case (l, u) => Math.min(l, u) to Math.max(l, u)
92        }
93        PartitionPruningRDD.create(self, partitionIndicies.contains)
94      case _ =>
95        self
96    }
97    rddToFilter.filter { case (k, v) => inRange(k) }
98  }
99
100}
101