1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import org.apache.spark.rdd.RDD 21 22class ImplicitOrderingSuite extends SparkFunSuite with LocalSparkContext { 23 // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should. 24 test("basic inference of Orderings") { 25 sc = new SparkContext("local", "test") 26 val rdd = sc.parallelize(1 to 10) 27 28 // These RDD methods are in the companion object so that the unserializable ScalaTest Engine 29 // won't be reachable from the closure object 30 31 // Infer orderings after basic maps to particular types 32 val basicMapExpectations = ImplicitOrderingSuite.basicMapExpectations(rdd) 33 basicMapExpectations.foreach { case (met, explain) => assert(met, explain) } 34 35 // Infer orderings for other RDD methods 36 val otherRDDMethodExpectations = ImplicitOrderingSuite.otherRDDMethodExpectations(rdd) 37 otherRDDMethodExpectations.foreach { case (met, explain) => assert(met, explain) } 38 } 39} 40 41private object ImplicitOrderingSuite { 42 class NonOrderedClass {} 43 44 class ComparableClass extends Comparable[ComparableClass] { 45 override def compareTo(o: ComparableClass): Int = throw new UnsupportedOperationException 46 } 47 48 class OrderedClass extends Ordered[OrderedClass] { 49 override def compare(o: OrderedClass): Int = throw new UnsupportedOperationException 50 } 51 52 def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = { 53 List((rdd.map(x => (x, x)).keyOrdering.isDefined, 54 "rdd.map(x => (x, x)).keyOrdering.isDefined"), 55 (rdd.map(x => (1, x)).keyOrdering.isDefined, 56 "rdd.map(x => (1, x)).keyOrdering.isDefined"), 57 (rdd.map(x => (x.toString, x)).keyOrdering.isDefined, 58 "rdd.map(x => (x.toString, x)).keyOrdering.isDefined"), 59 (rdd.map(x => (null, x)).keyOrdering.isDefined, 60 "rdd.map(x => (null, x)).keyOrdering.isDefined"), 61 (rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty, 62 "rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty"), 63 (rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined, 64 "rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined"), 65 (rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined, 66 "rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined")) 67 } 68 69 def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = { 70 List((rdd.groupBy(x => x).keyOrdering.isDefined, 71 "rdd.groupBy(x => x).keyOrdering.isDefined"), 72 (rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty, 73 "rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty"), 74 (rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined, 75 "rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined"), 76 (rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined, 77 "rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined"), 78 (rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined, 79 "rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined"), 80 (rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined, 81 "rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined")) 82 } 83} 84