1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.datasources 19 20import java.util.{ServiceConfigurationError, ServiceLoader} 21 22import scala.collection.JavaConverters._ 23import scala.language.{existentials, implicitConversions} 24import scala.util.{Failure, Success, Try} 25import scala.util.control.NonFatal 26 27import org.apache.hadoop.fs.Path 28 29import org.apache.spark.deploy.SparkHadoopUtil 30import org.apache.spark.internal.Logging 31import org.apache.spark.sql._ 32import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable} 33import org.apache.spark.sql.catalyst.expressions.Attribute 34import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap 35import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat 36import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider 37import org.apache.spark.sql.execution.datasources.json.JsonFileFormat 38import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat 39import org.apache.spark.sql.execution.streaming._ 40import org.apache.spark.sql.sources._ 41import org.apache.spark.sql.streaming.OutputMode 42import org.apache.spark.sql.types.{CalendarIntervalType, StructType} 43import org.apache.spark.util.Utils 44 45/** 46 * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to 47 * acting as the canonical set of parameters that can describe a Data Source, this class is used to 48 * resolve a description to a concrete implementation that can be used in a query plan 49 * (either batch or streaming) or to write out data using an external library. 50 * 51 * From an end user's perspective a DataSource description can be created explicitly using 52 * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL. Additionally, this class is 53 * used when resolving a description from a metastore to a concrete implementation. 54 * 55 * Many of the arguments to this class are optional, though depending on the specific API being used 56 * these optional arguments might be filled in during resolution using either inference or external 57 * metadata. For example, when reading a partitioned table from a file system, partition columns 58 * will be inferred from the directory layout even if they are not specified. 59 * 60 * @param paths A list of file system paths that hold data. These will be globbed before and 61 * qualified. This option only works when reading from a [[FileFormat]]. 62 * @param userSpecifiedSchema An optional specification of the schema of the data. When present 63 * we skip attempting to infer the schema. 64 * @param partitionColumns A list of column names that the relation is partitioned by. This list is 65 * generally empty during the read path, unless this DataSource is managed 66 * by Hive. In these cases, during `resolveRelation`, we will call 67 * `getOrInferFileFormatSchema` for file based DataSources to infer the 68 * partitioning. In other cases, if this list is empty, then this table 69 * is unpartitioned. 70 * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data. 71 * @param catalogTable Optional catalog table reference that can be used to push down operations 72 * over the datasource to the catalog service. 73 */ 74case class DataSource( 75 sparkSession: SparkSession, 76 className: String, 77 paths: Seq[String] = Nil, 78 userSpecifiedSchema: Option[StructType] = None, 79 partitionColumns: Seq[String] = Seq.empty, 80 bucketSpec: Option[BucketSpec] = None, 81 options: Map[String, String] = Map.empty, 82 catalogTable: Option[CatalogTable] = None) extends Logging { 83 84 case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) 85 86 lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) 87 lazy val sourceInfo = sourceSchema() 88 private val caseInsensitiveOptions = new CaseInsensitiveMap(options) 89 90 /** 91 * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer 92 * it. In the read path, only managed tables by Hive provide the partition columns properly when 93 * initializing this class. All other file based data sources will try to infer the partitioning, 94 * and then cast the inferred types to user specified dataTypes if the partition columns exist 95 * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. 96 * This method will try to skip file scanning whether `userSpecifiedSchema` and 97 * `partitionColumns` are provided. Here are some code paths that use this method: 98 * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns 99 * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the 100 * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred 101 * dataType if they don't. 102 * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to 103 * provide the schema. Here, we also perform partition inference like 2, and try to use 104 * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use 105 * this information, therefore calls to this method should be very cheap, i.e. there won't 106 * be any further inference in any triggers. 107 * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the 108 * existing table's partitioning scheme. This is achieved by not providing 109 * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early 110 * exit, if we don't care about the schema of the original table. 111 * 112 * @param format the file format object for this DataSource 113 * @param justPartitioning Whether to exit early and provide just the schema partitioning. 114 * @return A pair of the data schema (excluding partition columns) and the schema of the partition 115 * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as 116 * `null`. 117 */ 118 private def getOrInferFileFormatSchema( 119 format: FileFormat, 120 justPartitioning: Boolean = false): (StructType, StructType) = { 121 // the operations below are expensive therefore try not to do them if we don't need to, e.g., 122 // in streaming mode, we have already inferred and registered partition columns, we will 123 // never have to materialize the lazy val below 124 lazy val tempFileIndex = { 125 val allPaths = caseInsensitiveOptions.get("path") ++ paths 126 val hadoopConf = sparkSession.sessionState.newHadoopConf() 127 val globbedPaths = allPaths.toSeq.flatMap { path => 128 val hdfsPath = new Path(path) 129 val fs = hdfsPath.getFileSystem(hadoopConf) 130 val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) 131 SparkHadoopUtil.get.globPathIfNecessary(qualified) 132 }.toArray 133 new InMemoryFileIndex(sparkSession, globbedPaths, options, None) 134 } 135 val partitionSchema = if (partitionColumns.isEmpty) { 136 // Try to infer partitioning, because no DataSource in the read path provides the partitioning 137 // columns properly unless it is a Hive DataSource 138 val resolved = tempFileIndex.partitionSchema.map { partitionField => 139 val equality = sparkSession.sessionState.conf.resolver 140 // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred 141 userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( 142 partitionField) 143 } 144 StructType(resolved) 145 } else { 146 // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred 147 // partitioning 148 if (userSpecifiedSchema.isEmpty) { 149 val inferredPartitions = tempFileIndex.partitionSchema 150 inferredPartitions 151 } else { 152 val partitionFields = partitionColumns.map { partitionColumn => 153 val equality = sparkSession.sessionState.conf.resolver 154 userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { 155 val inferredPartitions = tempFileIndex.partitionSchema 156 val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) 157 if (inferredOpt.isDefined) { 158 logDebug( 159 s"""Type of partition column: $partitionColumn not found in specified schema 160 |for $format. 161 |User Specified Schema 162 |===================== 163 |${userSpecifiedSchema.orNull} 164 | 165 |Falling back to inferred dataType if it exists. 166 """.stripMargin) 167 } 168 inferredOpt 169 }.getOrElse { 170 throw new AnalysisException(s"Failed to resolve the schema for $format for " + 171 s"the partition column: $partitionColumn. It must be specified manually.") 172 } 173 } 174 StructType(partitionFields) 175 } 176 } 177 if (justPartitioning) { 178 return (null, partitionSchema) 179 } 180 val dataSchema = userSpecifiedSchema.map { schema => 181 val equality = sparkSession.sessionState.conf.resolver 182 StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) 183 }.orElse { 184 format.inferSchema( 185 sparkSession, 186 caseInsensitiveOptions, 187 tempFileIndex.allFiles()) 188 }.getOrElse { 189 throw new AnalysisException( 190 s"Unable to infer schema for $format. It must be specified manually.") 191 } 192 (dataSchema, partitionSchema) 193 } 194 195 /** Returns the name and schema of the source that can be used to continually read data. */ 196 private def sourceSchema(): SourceInfo = { 197 providingClass.newInstance() match { 198 case s: StreamSourceProvider => 199 val (name, schema) = s.sourceSchema( 200 sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions) 201 SourceInfo(name, schema, Nil) 202 203 case format: FileFormat => 204 val path = caseInsensitiveOptions.getOrElse("path", { 205 throw new IllegalArgumentException("'path' is not specified") 206 }) 207 208 // Check whether the path exists if it is not a glob pattern. 209 // For glob pattern, we do not check it because the glob pattern might only make sense 210 // once the streaming job starts and some upstream source starts dropping data. 211 val hdfsPath = new Path(path) 212 if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { 213 val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) 214 if (!fs.exists(hdfsPath)) { 215 throw new AnalysisException(s"Path does not exist: $path") 216 } 217 } 218 219 val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference 220 val isTextSource = providingClass == classOf[text.TextFileFormat] 221 // If the schema inference is disabled, only text sources require schema to be specified 222 if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { 223 throw new IllegalArgumentException( 224 "Schema must be specified when creating a streaming source DataFrame. " + 225 "If some files already exist in the directory, then depending on the file format " + 226 "you may be able to create a static DataFrame on that directory with " + 227 "'spark.read.load(directory)' and infer schema from it.") 228 } 229 val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) 230 SourceInfo( 231 s"FileSource[$path]", 232 StructType(dataSchema ++ partitionSchema), 233 partitionSchema.fieldNames) 234 235 case _ => 236 throw new UnsupportedOperationException( 237 s"Data source $className does not support streamed reading") 238 } 239 } 240 241 /** Returns a source that can be used to continually read data. */ 242 def createSource(metadataPath: String): Source = { 243 providingClass.newInstance() match { 244 case s: StreamSourceProvider => 245 s.createSource( 246 sparkSession.sqlContext, 247 metadataPath, 248 userSpecifiedSchema, 249 className, 250 caseInsensitiveOptions) 251 252 case format: FileFormat => 253 val path = caseInsensitiveOptions.getOrElse("path", { 254 throw new IllegalArgumentException("'path' is not specified") 255 }) 256 new FileStreamSource( 257 sparkSession = sparkSession, 258 path = path, 259 fileFormatClassName = className, 260 schema = sourceInfo.schema, 261 partitionColumns = sourceInfo.partitionColumns, 262 metadataPath = metadataPath, 263 options = caseInsensitiveOptions) 264 case _ => 265 throw new UnsupportedOperationException( 266 s"Data source $className does not support streamed reading") 267 } 268 } 269 270 /** Returns a sink that can be used to continually write data. */ 271 def createSink(outputMode: OutputMode): Sink = { 272 providingClass.newInstance() match { 273 case s: StreamSinkProvider => 274 s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode) 275 276 case fileFormat: FileFormat => 277 val path = caseInsensitiveOptions.getOrElse("path", { 278 throw new IllegalArgumentException("'path' is not specified") 279 }) 280 if (outputMode != OutputMode.Append) { 281 throw new AnalysisException( 282 s"Data source $className does not support $outputMode output mode") 283 } 284 new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions) 285 286 case _ => 287 throw new UnsupportedOperationException( 288 s"Data source $className does not support streamed writing") 289 } 290 } 291 292 /** 293 * Returns true if there is a single path that has a metadata log indicating which files should 294 * be read. 295 */ 296 def hasMetadata(path: Seq[String]): Boolean = { 297 path match { 298 case Seq(singlePath) => 299 try { 300 val hdfsPath = new Path(singlePath) 301 val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) 302 val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) 303 val res = fs.exists(metadataPath) 304 res 305 } catch { 306 case NonFatal(e) => 307 logWarning(s"Error while looking for metadata directory.") 308 false 309 } 310 case _ => false 311 } 312 } 313 314 /** 315 * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this 316 * [[DataSource]] 317 * 318 * @param checkFilesExist Whether to confirm that the files exist when generating the 319 * non-streaming file based datasource. StructuredStreaming jobs already 320 * list file existence, and when generating incremental jobs, the batch 321 * is considered as a non-streaming file based data source. Since we know 322 * that files already exist, we don't need to check them again. 323 */ 324 def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = { 325 val relation = (providingClass.newInstance(), userSpecifiedSchema) match { 326 // TODO: Throw when too much is given. 327 case (dataSource: SchemaRelationProvider, Some(schema)) => 328 dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema) 329 case (dataSource: RelationProvider, None) => 330 dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) 331 case (_: SchemaRelationProvider, None) => 332 throw new AnalysisException(s"A schema needs to be specified when using $className.") 333 case (dataSource: RelationProvider, Some(schema)) => 334 val baseRelation = 335 dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) 336 if (baseRelation.schema != schema) { 337 throw new AnalysisException(s"$className does not allow user-specified schemas.") 338 } 339 baseRelation 340 341 // We are reading from the results of a streaming query. Load files from the metadata log 342 // instead of listing them using HDFS APIs. 343 case (format: FileFormat, _) 344 if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => 345 val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) 346 val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) 347 val dataSchema = userSpecifiedSchema.orElse { 348 format.inferSchema( 349 sparkSession, 350 caseInsensitiveOptions, 351 fileCatalog.allFiles()) 352 }.getOrElse { 353 throw new AnalysisException( 354 s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " + 355 "It must be specified manually") 356 } 357 358 HadoopFsRelation( 359 fileCatalog, 360 partitionSchema = fileCatalog.partitionSchema, 361 dataSchema = dataSchema, 362 bucketSpec = None, 363 format, 364 caseInsensitiveOptions)(sparkSession) 365 366 // This is a non-streaming file based datasource. 367 case (format: FileFormat, _) => 368 val allPaths = caseInsensitiveOptions.get("path") ++ paths 369 val hadoopConf = sparkSession.sessionState.newHadoopConf() 370 val globbedPaths = allPaths.flatMap { path => 371 val hdfsPath = new Path(path) 372 val fs = hdfsPath.getFileSystem(hadoopConf) 373 val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) 374 val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) 375 376 if (globPath.isEmpty) { 377 throw new AnalysisException(s"Path does not exist: $qualified") 378 } 379 // Sufficient to check head of the globPath seq for non-glob scenario 380 // Don't need to check once again if files exist in streaming mode 381 if (checkFilesExist && !fs.exists(globPath.head)) { 382 throw new AnalysisException(s"Path does not exist: ${globPath.head}") 383 } 384 globPath 385 }.toArray 386 387 val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) 388 389 val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && 390 catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { 391 val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes 392 new CatalogFileIndex( 393 sparkSession, 394 catalogTable.get, 395 catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) 396 } else { 397 new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) 398 } 399 400 HadoopFsRelation( 401 fileCatalog, 402 partitionSchema = partitionSchema, 403 dataSchema = dataSchema.asNullable, 404 bucketSpec = bucketSpec, 405 format, 406 caseInsensitiveOptions)(sparkSession) 407 408 case _ => 409 throw new AnalysisException( 410 s"$className is not a valid Spark SQL Data Source.") 411 } 412 413 relation 414 } 415 416 /** 417 * Writes the given [[DataFrame]] out in this [[FileFormat]]. 418 */ 419 private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { 420 // Don't glob path for the write path. The contracts here are: 421 // 1. Only one output path can be specified on the write path; 422 // 2. Output path must be a legal HDFS style file system path; 423 // 3. It's OK that the output path doesn't exist yet; 424 val allPaths = paths ++ caseInsensitiveOptions.get("path") 425 val outputPath = if (allPaths.length == 1) { 426 val path = new Path(allPaths.head) 427 val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) 428 path.makeQualified(fs.getUri, fs.getWorkingDirectory) 429 } else { 430 throw new IllegalArgumentException("Expected exactly one path to be specified, but " + 431 s"got: ${allPaths.mkString(", ")}") 432 } 433 434 val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis 435 PartitioningUtils.validatePartitionColumn( 436 data.schema, partitionColumns, caseSensitive) 437 438 // If we are appending to a table that already exists, make sure the partitioning matches 439 // up. If we fail to load the table for whatever reason, ignore the check. 440 if (mode == SaveMode.Append) { 441 val existingPartitionColumns = Try { 442 getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList 443 }.getOrElse(Seq.empty[String]) 444 // TODO: Case sensitivity. 445 val sameColumns = 446 existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) 447 if (existingPartitionColumns.nonEmpty && !sameColumns) { 448 throw new AnalysisException( 449 s"""Requested partitioning does not match existing partitioning. 450 |Existing partitioning columns: 451 | ${existingPartitionColumns.mkString(", ")} 452 |Requested partitioning columns: 453 | ${partitionColumns.mkString(", ")} 454 |""".stripMargin) 455 } 456 } 457 458 // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does 459 // not need to have the query as child, to avoid to analyze an optimized query, 460 // because InsertIntoHadoopFsRelationCommand will be optimized first. 461 val columns = partitionColumns.map { name => 462 val plan = data.logicalPlan 463 plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { 464 throw new AnalysisException( 465 s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") 466 }.asInstanceOf[Attribute] 467 } 468 // For partitioned relation r, r.schema's column ordering can be different from the column 469 // ordering of data.logicalPlan (partition columns are all moved after data column). This 470 // will be adjusted within InsertIntoHadoopFsRelation. 471 val plan = 472 InsertIntoHadoopFsRelationCommand( 473 outputPath = outputPath, 474 staticPartitionKeys = Map.empty, 475 customPartitionLocations = Map.empty, 476 partitionColumns = columns, 477 bucketSpec = bucketSpec, 478 fileFormat = format, 479 refreshFunction = _ => Unit, // No existing table needs to be refreshed. 480 options = options, 481 query = data.logicalPlan, 482 mode = mode, 483 catalogTable = catalogTable) 484 sparkSession.sessionState.executePlan(plan).toRdd 485 } 486 487 /** 488 * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for 489 * the following reading. 490 */ 491 def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = { 492 if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { 493 throw new AnalysisException("Cannot save interval data type into external storage.") 494 } 495 496 providingClass.newInstance() match { 497 case dataSource: CreatableRelationProvider => 498 dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) 499 case format: FileFormat => 500 writeInFileFormat(format, mode, data) 501 // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. 502 copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() 503 case _ => 504 sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") 505 } 506 } 507 508 /** 509 * Writes the given [[DataFrame]] out to this [[DataSource]]. 510 */ 511 def write(mode: SaveMode, data: DataFrame): Unit = { 512 if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { 513 throw new AnalysisException("Cannot save interval data type into external storage.") 514 } 515 516 providingClass.newInstance() match { 517 case dataSource: CreatableRelationProvider => 518 dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) 519 case format: FileFormat => 520 writeInFileFormat(format, mode, data) 521 case _ => 522 sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") 523 } 524 } 525} 526 527object DataSource { 528 529 /** A map to maintain backward compatibility in case we move data sources around. */ 530 private val backwardCompatibilityMap: Map[String, String] = { 531 val jdbc = classOf[JdbcRelationProvider].getCanonicalName 532 val json = classOf[JsonFileFormat].getCanonicalName 533 val parquet = classOf[ParquetFileFormat].getCanonicalName 534 val csv = classOf[CSVFileFormat].getCanonicalName 535 val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" 536 val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" 537 538 Map( 539 "org.apache.spark.sql.jdbc" -> jdbc, 540 "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc, 541 "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc, 542 "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc, 543 "org.apache.spark.sql.json" -> json, 544 "org.apache.spark.sql.json.DefaultSource" -> json, 545 "org.apache.spark.sql.execution.datasources.json" -> json, 546 "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json, 547 "org.apache.spark.sql.parquet" -> parquet, 548 "org.apache.spark.sql.parquet.DefaultSource" -> parquet, 549 "org.apache.spark.sql.execution.datasources.parquet" -> parquet, 550 "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, 551 "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, 552 "org.apache.spark.sql.hive.orc" -> orc, 553 "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, 554 "org.apache.spark.ml.source.libsvm" -> libsvm, 555 "com.databricks.spark.csv" -> csv 556 ) 557 } 558 559 /** 560 * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0. 561 */ 562 private val spark2RemovedClasses = Set( 563 "org.apache.spark.sql.DataFrame", 564 "org.apache.spark.sql.sources.HadoopFsRelationProvider", 565 "org.apache.spark.Logging") 566 567 /** Given a provider name, look up the data source class definition. */ 568 def lookupDataSource(provider: String): Class[_] = { 569 val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) 570 val provider2 = s"$provider1.DefaultSource" 571 val loader = Utils.getContextOrSparkClassLoader 572 val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) 573 574 try { 575 serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match { 576 // the provider format did not match any given registered aliases 577 case Nil => 578 try { 579 Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match { 580 case Success(dataSource) => 581 // Found the data source using fully qualified path 582 dataSource 583 case Failure(error) => 584 if (provider1.toLowerCase == "orc" || 585 provider1.startsWith("org.apache.spark.sql.hive.orc")) { 586 throw new AnalysisException( 587 "The ORC data source must be used with Hive support enabled") 588 } else if (provider1.toLowerCase == "avro" || 589 provider1 == "com.databricks.spark.avro") { 590 throw new AnalysisException( 591 s"Failed to find data source: ${provider1.toLowerCase}. Please find an Avro " + 592 "package at http://spark.apache.org/third-party-projects.html") 593 } else { 594 throw new ClassNotFoundException( 595 s"Failed to find data source: $provider1. Please find packages at " + 596 "http://spark.apache.org/third-party-projects.html", 597 error) 598 } 599 } 600 } catch { 601 case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal 602 // NoClassDefFoundError's class name uses "/" rather than "." for packages 603 val className = e.getMessage.replaceAll("/", ".") 604 if (spark2RemovedClasses.contains(className)) { 605 throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " + 606 "Please check if your library is compatible with Spark 2.0", e) 607 } else { 608 throw e 609 } 610 } 611 case head :: Nil => 612 // there is exactly one registered alias 613 head.getClass 614 case sources => 615 // There are multiple registered aliases for the input 616 sys.error(s"Multiple sources found for $provider1 " + 617 s"(${sources.map(_.getClass.getName).mkString(", ")}), " + 618 "please specify the fully qualified class name.") 619 } 620 } catch { 621 case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] => 622 // NoClassDefFoundError's class name uses "/" rather than "." for packages 623 val className = e.getCause.getMessage.replaceAll("/", ".") 624 if (spark2RemovedClasses.contains(className)) { 625 throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " + 626 "Please remove the incompatible library from classpath or upgrade it. " + 627 s"Error: ${e.getMessage}", e) 628 } else { 629 throw e 630 } 631 } 632 } 633 634 /** 635 * When creating a data source table, the `path` option has a special meaning: the table location. 636 * This method extracts the `path` option and treat it as table location to build a 637 * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this. 638 */ 639 def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { 640 val path = new CaseInsensitiveMap(options).get("path") 641 val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") 642 CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath) 643 } 644} 645