1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.catalyst.plans
19
20import org.apache.spark.SparkFunSuite
21import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
22import org.apache.spark.sql.catalyst.plans.logical._
23import org.apache.spark.sql.types.IntegerType
24
25/**
26 * This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly
27 * skips sub-trees that have already been marked as analyzed.
28 */
29class LogicalPlanSuite extends SparkFunSuite {
30  private var invocationCount = 0
31  private val function: PartialFunction[LogicalPlan, LogicalPlan] = {
32    case p: Project =>
33      invocationCount += 1
34      p
35  }
36
37  private val testRelation = LocalRelation()
38
39  test("resolveOperator runs on operators") {
40    invocationCount = 0
41    val plan = Project(Nil, testRelation)
42    plan resolveOperators function
43
44    assert(invocationCount === 1)
45  }
46
47  test("resolveOperator runs on operators recursively") {
48    invocationCount = 0
49    val plan = Project(Nil, Project(Nil, testRelation))
50    plan resolveOperators function
51
52    assert(invocationCount === 2)
53  }
54
55  test("resolveOperator skips all ready resolved plans") {
56    invocationCount = 0
57    val plan = Project(Nil, Project(Nil, testRelation))
58    plan.foreach(_.setAnalyzed())
59    plan resolveOperators function
60
61    assert(invocationCount === 0)
62  }
63
64  test("resolveOperator skips partially resolved plans") {
65    invocationCount = 0
66    val plan1 = Project(Nil, testRelation)
67    val plan2 = Project(Nil, plan1)
68    plan1.foreach(_.setAnalyzed())
69    plan2 resolveOperators function
70
71    assert(invocationCount === 1)
72  }
73
74  test("isStreaming") {
75    val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
76    val incrementalRelation = new LocalRelation(
77      Seq(AttributeReference("a", IntegerType, nullable = true)())) {
78      override def isStreaming(): Boolean = true
79    }
80
81    case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
82      override def output: Seq[Attribute] = left.output ++ right.output
83    }
84
85    require(relation.isStreaming === false)
86    require(incrementalRelation.isStreaming === true)
87    assert(TestBinaryRelation(relation, relation).isStreaming === false)
88    assert(TestBinaryRelation(incrementalRelation, relation).isStreaming === true)
89    assert(TestBinaryRelation(relation, incrementalRelation).isStreaming === true)
90    assert(TestBinaryRelation(incrementalRelation, incrementalRelation).isStreaming)
91  }
92}
93