1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.internal
19
20import java.io.File
21
22import org.apache.hadoop.conf.Configuration
23import org.apache.hadoop.fs.Path
24
25import org.apache.spark.sql._
26import org.apache.spark.sql.catalyst.TableIdentifier
27import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
28import org.apache.spark.sql.catalyst.catalog._
29import org.apache.spark.sql.catalyst.optimizer.Optimizer
30import org.apache.spark.sql.catalyst.parser.ParserInterface
31import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
32import org.apache.spark.sql.execution._
33import org.apache.spark.sql.execution.command.AnalyzeTableCommand
34import org.apache.spark.sql.execution.datasources._
35import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
36import org.apache.spark.sql.util.ExecutionListenerManager
37
38
39/**
40 * A class that holds all session-specific state in a given [[SparkSession]].
41 */
42private[sql] class SessionState(sparkSession: SparkSession) {
43
44  // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
45  // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
46
47  /**
48   * SQL-specific key-value configurations.
49   */
50  lazy val conf: SQLConf = new SQLConf
51
52  def newHadoopConf(): Configuration = {
53    val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
54    conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) }
55    hadoopConf
56  }
57
58  def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
59    val hadoopConf = newHadoopConf()
60    options.foreach { case (k, v) =>
61      if ((v ne null) && k != "path" && k != "paths") {
62        hadoopConf.set(k, v)
63      }
64    }
65    hadoopConf
66  }
67
68  lazy val experimentalMethods = new ExperimentalMethods
69
70  /**
71   * Internal catalog for managing functions registered by the user.
72   */
73  lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
74
75  /**
76   * A class for loading resources specified by a function.
77   */
78  lazy val functionResourceLoader: FunctionResourceLoader = {
79    new FunctionResourceLoader {
80      override def loadResource(resource: FunctionResource): Unit = {
81        resource.resourceType match {
82          case JarResource => addJar(resource.uri)
83          case FileResource => sparkSession.sparkContext.addFile(resource.uri)
84          case ArchiveResource =>
85            throw new AnalysisException(
86              "Archive is not allowed to be loaded. If YARN mode is used, " +
87                "please use --archives options while calling spark-submit.")
88        }
89      }
90    }
91  }
92
93  /**
94   * Internal catalog for managing table and database states.
95   */
96  lazy val catalog = new SessionCatalog(
97    sparkSession.sharedState.externalCatalog,
98    sparkSession.sharedState.globalTempViewManager,
99    functionResourceLoader,
100    functionRegistry,
101    conf,
102    newHadoopConf())
103
104  /**
105   * Interface exposed to the user for registering user-defined functions.
106   * Note that the user-defined functions must be deterministic.
107   */
108  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
109
110  /**
111   * Logical query plan analyzer for resolving unresolved attributes and relations.
112   */
113  lazy val analyzer: Analyzer = {
114    new Analyzer(catalog, conf) {
115      override val extendedResolutionRules =
116        AnalyzeCreateTable(sparkSession) ::
117        PreprocessTableInsertion(conf) ::
118        new FindDataSourceTable(sparkSession) ::
119        DataSourceAnalysis(conf) ::
120        (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
121
122      override val extendedCheckRules =
123        Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)
124    }
125  }
126
127  /**
128   * Logical query plan optimizer.
129   */
130  lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
131
132  /**
133   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
134   */
135  lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
136
137  /**
138   * Planner that converts optimized logical plans to physical plans.
139   */
140  def planner: SparkPlanner =
141    new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
142
143  /**
144   * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
145   * that listen for execution metrics.
146   */
147  lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
148
149  /**
150   * Interface to start and stop [[StreamingQuery]]s.
151   */
152  lazy val streamingQueryManager: StreamingQueryManager = {
153    new StreamingQueryManager(sparkSession)
154  }
155
156  private val jarClassLoader: NonClosableMutableURLClassLoader =
157    sparkSession.sharedState.jarClassLoader
158
159  // Automatically extract all entries and put it in our SQLConf
160  // We need to call it after all of vals have been initialized.
161  sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
162    conf.setConfString(k, v)
163  }
164
165  // ------------------------------------------------------
166  //  Helper methods, partially leftover from pre-2.0 days
167  // ------------------------------------------------------
168
169  def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
170
171  def refreshTable(tableName: String): Unit = {
172    catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
173  }
174
175  def addJar(path: String): Unit = {
176    sparkSession.sparkContext.addJar(path)
177
178    val uri = new Path(path).toUri
179    val jarURL = if (uri.getScheme == null) {
180      // `path` is a local file path without a URL scheme
181      new File(path).toURI.toURL
182    } else {
183      // `path` is a URL with a scheme
184      uri.toURL
185    }
186    jarClassLoader.addURL(jarURL)
187    Thread.currentThread().setContextClassLoader(jarClassLoader)
188  }
189
190  /**
191   * Analyzes the given table in the current database to generate statistics, which will be
192   * used in query optimizations.
193   */
194  def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = {
195    AnalyzeTableCommand(tableIdent, noscan).run(sparkSession)
196  }
197}
198