1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import org.apache.spark.rdd.RDD
21
22class ImplicitOrderingSuite extends SparkFunSuite with LocalSparkContext {
23  // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should.
24  test("basic inference of Orderings") {
25    sc = new SparkContext("local", "test")
26    val rdd = sc.parallelize(1 to 10)
27
28    // These RDD methods are in the companion object so that the unserializable ScalaTest Engine
29    // won't be reachable from the closure object
30
31    // Infer orderings after basic maps to particular types
32    val basicMapExpectations = ImplicitOrderingSuite.basicMapExpectations(rdd)
33    basicMapExpectations.foreach { case (met, explain) => assert(met, explain) }
34
35    // Infer orderings for other RDD methods
36    val otherRDDMethodExpectations = ImplicitOrderingSuite.otherRDDMethodExpectations(rdd)
37    otherRDDMethodExpectations.foreach { case (met, explain) => assert(met, explain) }
38  }
39}
40
41private object ImplicitOrderingSuite {
42  class NonOrderedClass {}
43
44  class ComparableClass extends Comparable[ComparableClass] {
45    override def compareTo(o: ComparableClass): Int = throw new UnsupportedOperationException
46  }
47
48  class OrderedClass extends Ordered[OrderedClass] {
49    override def compare(o: OrderedClass): Int = throw new UnsupportedOperationException
50  }
51
52  def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
53    List((rdd.map(x => (x, x)).keyOrdering.isDefined,
54            "rdd.map(x => (x, x)).keyOrdering.isDefined"),
55          (rdd.map(x => (1, x)).keyOrdering.isDefined,
56            "rdd.map(x => (1, x)).keyOrdering.isDefined"),
57          (rdd.map(x => (x.toString, x)).keyOrdering.isDefined,
58            "rdd.map(x => (x.toString, x)).keyOrdering.isDefined"),
59          (rdd.map(x => (null, x)).keyOrdering.isDefined,
60            "rdd.map(x => (null, x)).keyOrdering.isDefined"),
61          (rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty,
62            "rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty"),
63          (rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined,
64            "rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined"),
65          (rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined,
66            "rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined"))
67  }
68
69  def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
70    List((rdd.groupBy(x => x).keyOrdering.isDefined,
71           "rdd.groupBy(x => x).keyOrdering.isDefined"),
72         (rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty,
73           "rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty"),
74         (rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined,
75           "rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined"),
76         (rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined,
77           "rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined"),
78         (rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined,
79           "rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined"),
80         (rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined,
81           "rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined"))
82  }
83}
84