1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.catalyst.plans 19 20import org.apache.spark.SparkFunSuite 21import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} 22import org.apache.spark.sql.catalyst.plans.logical._ 23import org.apache.spark.sql.types.IntegerType 24 25/** 26 * This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly 27 * skips sub-trees that have already been marked as analyzed. 28 */ 29class LogicalPlanSuite extends SparkFunSuite { 30 private var invocationCount = 0 31 private val function: PartialFunction[LogicalPlan, LogicalPlan] = { 32 case p: Project => 33 invocationCount += 1 34 p 35 } 36 37 private val testRelation = LocalRelation() 38 39 test("resolveOperator runs on operators") { 40 invocationCount = 0 41 val plan = Project(Nil, testRelation) 42 plan resolveOperators function 43 44 assert(invocationCount === 1) 45 } 46 47 test("resolveOperator runs on operators recursively") { 48 invocationCount = 0 49 val plan = Project(Nil, Project(Nil, testRelation)) 50 plan resolveOperators function 51 52 assert(invocationCount === 2) 53 } 54 55 test("resolveOperator skips all ready resolved plans") { 56 invocationCount = 0 57 val plan = Project(Nil, Project(Nil, testRelation)) 58 plan.foreach(_.setAnalyzed()) 59 plan resolveOperators function 60 61 assert(invocationCount === 0) 62 } 63 64 test("resolveOperator skips partially resolved plans") { 65 invocationCount = 0 66 val plan1 = Project(Nil, testRelation) 67 val plan2 = Project(Nil, plan1) 68 plan1.foreach(_.setAnalyzed()) 69 plan2 resolveOperators function 70 71 assert(invocationCount === 1) 72 } 73 74 test("isStreaming") { 75 val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) 76 val incrementalRelation = new LocalRelation( 77 Seq(AttributeReference("a", IntegerType, nullable = true)())) { 78 override def isStreaming(): Boolean = true 79 } 80 81 case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { 82 override def output: Seq[Attribute] = left.output ++ right.output 83 } 84 85 require(relation.isStreaming === false) 86 require(incrementalRelation.isStreaming === true) 87 assert(TestBinaryRelation(relation, relation).isStreaming === false) 88 assert(TestBinaryRelation(incrementalRelation, relation).isStreaming === true) 89 assert(TestBinaryRelation(relation, incrementalRelation).isStreaming === true) 90 assert(TestBinaryRelation(incrementalRelation, incrementalRelation).isStreaming) 91 } 92} 93