1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.internal 19 20import java.io.File 21 22import org.apache.hadoop.conf.Configuration 23import org.apache.hadoop.fs.Path 24 25import org.apache.spark.sql._ 26import org.apache.spark.sql.catalyst.TableIdentifier 27import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} 28import org.apache.spark.sql.catalyst.catalog._ 29import org.apache.spark.sql.catalyst.optimizer.Optimizer 30import org.apache.spark.sql.catalyst.parser.ParserInterface 31import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 32import org.apache.spark.sql.execution._ 33import org.apache.spark.sql.execution.command.AnalyzeTableCommand 34import org.apache.spark.sql.execution.datasources._ 35import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager} 36import org.apache.spark.sql.util.ExecutionListenerManager 37 38 39/** 40 * A class that holds all session-specific state in a given [[SparkSession]]. 41 */ 42private[sql] class SessionState(sparkSession: SparkSession) { 43 44 // Note: These are all lazy vals because they depend on each other (e.g. conf) and we 45 // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. 46 47 /** 48 * SQL-specific key-value configurations. 49 */ 50 lazy val conf: SQLConf = new SQLConf 51 52 def newHadoopConf(): Configuration = { 53 val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) 54 conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) } 55 hadoopConf 56 } 57 58 def newHadoopConfWithOptions(options: Map[String, String]): Configuration = { 59 val hadoopConf = newHadoopConf() 60 options.foreach { case (k, v) => 61 if ((v ne null) && k != "path" && k != "paths") { 62 hadoopConf.set(k, v) 63 } 64 } 65 hadoopConf 66 } 67 68 lazy val experimentalMethods = new ExperimentalMethods 69 70 /** 71 * Internal catalog for managing functions registered by the user. 72 */ 73 lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() 74 75 /** 76 * A class for loading resources specified by a function. 77 */ 78 lazy val functionResourceLoader: FunctionResourceLoader = { 79 new FunctionResourceLoader { 80 override def loadResource(resource: FunctionResource): Unit = { 81 resource.resourceType match { 82 case JarResource => addJar(resource.uri) 83 case FileResource => sparkSession.sparkContext.addFile(resource.uri) 84 case ArchiveResource => 85 throw new AnalysisException( 86 "Archive is not allowed to be loaded. If YARN mode is used, " + 87 "please use --archives options while calling spark-submit.") 88 } 89 } 90 } 91 } 92 93 /** 94 * Internal catalog for managing table and database states. 95 */ 96 lazy val catalog = new SessionCatalog( 97 sparkSession.sharedState.externalCatalog, 98 sparkSession.sharedState.globalTempViewManager, 99 functionResourceLoader, 100 functionRegistry, 101 conf, 102 newHadoopConf()) 103 104 /** 105 * Interface exposed to the user for registering user-defined functions. 106 * Note that the user-defined functions must be deterministic. 107 */ 108 lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry) 109 110 /** 111 * Logical query plan analyzer for resolving unresolved attributes and relations. 112 */ 113 lazy val analyzer: Analyzer = { 114 new Analyzer(catalog, conf) { 115 override val extendedResolutionRules = 116 AnalyzeCreateTable(sparkSession) :: 117 PreprocessTableInsertion(conf) :: 118 new FindDataSourceTable(sparkSession) :: 119 DataSourceAnalysis(conf) :: 120 (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) 121 122 override val extendedCheckRules = 123 Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck) 124 } 125 } 126 127 /** 128 * Logical query plan optimizer. 129 */ 130 lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) 131 132 /** 133 * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. 134 */ 135 lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) 136 137 /** 138 * Planner that converts optimized logical plans to physical plans. 139 */ 140 def planner: SparkPlanner = 141 new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) 142 143 /** 144 * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s 145 * that listen for execution metrics. 146 */ 147 lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager 148 149 /** 150 * Interface to start and stop [[StreamingQuery]]s. 151 */ 152 lazy val streamingQueryManager: StreamingQueryManager = { 153 new StreamingQueryManager(sparkSession) 154 } 155 156 private val jarClassLoader: NonClosableMutableURLClassLoader = 157 sparkSession.sharedState.jarClassLoader 158 159 // Automatically extract all entries and put it in our SQLConf 160 // We need to call it after all of vals have been initialized. 161 sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) => 162 conf.setConfString(k, v) 163 } 164 165 // ------------------------------------------------------ 166 // Helper methods, partially leftover from pre-2.0 days 167 // ------------------------------------------------------ 168 169 def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) 170 171 def refreshTable(tableName: String): Unit = { 172 catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) 173 } 174 175 def addJar(path: String): Unit = { 176 sparkSession.sparkContext.addJar(path) 177 178 val uri = new Path(path).toUri 179 val jarURL = if (uri.getScheme == null) { 180 // `path` is a local file path without a URL scheme 181 new File(path).toURI.toURL 182 } else { 183 // `path` is a URL with a scheme 184 uri.toURL 185 } 186 jarClassLoader.addURL(jarURL) 187 Thread.currentThread().setContextClassLoader(jarClassLoader) 188 } 189 190 /** 191 * Analyzes the given table in the current database to generate statistics, which will be 192 * used in query optimizations. 193 */ 194 def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = { 195 AnalyzeTableCommand(tableIdent, noscan).run(sparkSession) 196 } 197} 198