1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.datasources
19
20import java.util.{ServiceConfigurationError, ServiceLoader}
21
22import scala.collection.JavaConverters._
23import scala.language.{existentials, implicitConversions}
24import scala.util.{Failure, Success, Try}
25import scala.util.control.NonFatal
26
27import org.apache.hadoop.fs.Path
28
29import org.apache.spark.deploy.SparkHadoopUtil
30import org.apache.spark.internal.Logging
31import org.apache.spark.sql._
32import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
33import org.apache.spark.sql.catalyst.expressions.Attribute
34import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
35import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
36import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
37import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
38import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
39import org.apache.spark.sql.execution.streaming._
40import org.apache.spark.sql.sources._
41import org.apache.spark.sql.streaming.OutputMode
42import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
43import org.apache.spark.util.Utils
44
45/**
46 * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
47 * acting as the canonical set of parameters that can describe a Data Source, this class is used to
48 * resolve a description to a concrete implementation that can be used in a query plan
49 * (either batch or streaming) or to write out data using an external library.
50 *
51 * From an end user's perspective a DataSource description can be created explicitly using
52 * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL.  Additionally, this class is
53 * used when resolving a description from a metastore to a concrete implementation.
54 *
55 * Many of the arguments to this class are optional, though depending on the specific API being used
56 * these optional arguments might be filled in during resolution using either inference or external
57 * metadata.  For example, when reading a partitioned table from a file system, partition columns
58 * will be inferred from the directory layout even if they are not specified.
59 *
60 * @param paths A list of file system paths that hold data.  These will be globbed before and
61 *              qualified. This option only works when reading from a [[FileFormat]].
62 * @param userSpecifiedSchema An optional specification of the schema of the data. When present
63 *                            we skip attempting to infer the schema.
64 * @param partitionColumns A list of column names that the relation is partitioned by. This list is
65 *                         generally empty during the read path, unless this DataSource is managed
66 *                         by Hive. In these cases, during `resolveRelation`, we will call
67 *                         `getOrInferFileFormatSchema` for file based DataSources to infer the
68 *                         partitioning. In other cases, if this list is empty, then this table
69 *                         is unpartitioned.
70 * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
71 * @param catalogTable Optional catalog table reference that can be used to push down operations
72 *                     over the datasource to the catalog service.
73 */
74case class DataSource(
75    sparkSession: SparkSession,
76    className: String,
77    paths: Seq[String] = Nil,
78    userSpecifiedSchema: Option[StructType] = None,
79    partitionColumns: Seq[String] = Seq.empty,
80    bucketSpec: Option[BucketSpec] = None,
81    options: Map[String, String] = Map.empty,
82    catalogTable: Option[CatalogTable] = None) extends Logging {
83
84  case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
85
86  lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
87  lazy val sourceInfo = sourceSchema()
88  private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
89
90  /**
91   * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
92   * it. In the read path, only managed tables by Hive provide the partition columns properly when
93   * initializing this class. All other file based data sources will try to infer the partitioning,
94   * and then cast the inferred types to user specified dataTypes if the partition columns exist
95   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510.
96   * This method will try to skip file scanning whether `userSpecifiedSchema` and
97   * `partitionColumns` are provided. Here are some code paths that use this method:
98   *   1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns
99   *   2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the
100   *     dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred
101   *     dataType if they don't.
102   *   3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to
103   *     provide the schema. Here, we also perform partition inference like 2, and try to use
104   *     dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
105   *     this information, therefore calls to this method should be very cheap, i.e. there won't
106   *     be any further inference in any triggers.
107   *   4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the
108   *     existing table's partitioning scheme. This is achieved by not providing
109   *     `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early
110   *     exit, if we don't care about the schema of the original table.
111   *
112   * @param format the file format object for this DataSource
113   * @param justPartitioning Whether to exit early and provide just the schema partitioning.
114   * @return A pair of the data schema (excluding partition columns) and the schema of the partition
115   *         columns. If `justPartitioning` is `true`, then the dataSchema will be provided as
116   *         `null`.
117   */
118  private def getOrInferFileFormatSchema(
119      format: FileFormat,
120      justPartitioning: Boolean = false): (StructType, StructType) = {
121    // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
122    // in streaming mode, we have already inferred and registered partition columns, we will
123    // never have to materialize the lazy val below
124    lazy val tempFileIndex = {
125      val allPaths = caseInsensitiveOptions.get("path") ++ paths
126      val hadoopConf = sparkSession.sessionState.newHadoopConf()
127      val globbedPaths = allPaths.toSeq.flatMap { path =>
128        val hdfsPath = new Path(path)
129        val fs = hdfsPath.getFileSystem(hadoopConf)
130        val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
131        SparkHadoopUtil.get.globPathIfNecessary(qualified)
132      }.toArray
133      new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
134    }
135    val partitionSchema = if (partitionColumns.isEmpty) {
136      // Try to infer partitioning, because no DataSource in the read path provides the partitioning
137      // columns properly unless it is a Hive DataSource
138      val resolved = tempFileIndex.partitionSchema.map { partitionField =>
139        val equality = sparkSession.sessionState.conf.resolver
140        // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
141        userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
142          partitionField)
143      }
144      StructType(resolved)
145    } else {
146      // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
147      // partitioning
148      if (userSpecifiedSchema.isEmpty) {
149        val inferredPartitions = tempFileIndex.partitionSchema
150        inferredPartitions
151      } else {
152        val partitionFields = partitionColumns.map { partitionColumn =>
153          val equality = sparkSession.sessionState.conf.resolver
154          userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
155            val inferredPartitions = tempFileIndex.partitionSchema
156            val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
157            if (inferredOpt.isDefined) {
158              logDebug(
159                s"""Type of partition column: $partitionColumn not found in specified schema
160                   |for $format.
161                   |User Specified Schema
162                   |=====================
163                   |${userSpecifiedSchema.orNull}
164                   |
165                   |Falling back to inferred dataType if it exists.
166                 """.stripMargin)
167            }
168            inferredOpt
169          }.getOrElse {
170            throw new AnalysisException(s"Failed to resolve the schema for $format for " +
171              s"the partition column: $partitionColumn. It must be specified manually.")
172          }
173        }
174        StructType(partitionFields)
175      }
176    }
177    if (justPartitioning) {
178      return (null, partitionSchema)
179    }
180    val dataSchema = userSpecifiedSchema.map { schema =>
181      val equality = sparkSession.sessionState.conf.resolver
182      StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
183    }.orElse {
184      format.inferSchema(
185        sparkSession,
186        caseInsensitiveOptions,
187        tempFileIndex.allFiles())
188    }.getOrElse {
189      throw new AnalysisException(
190        s"Unable to infer schema for $format. It must be specified manually.")
191    }
192    (dataSchema, partitionSchema)
193  }
194
195  /** Returns the name and schema of the source that can be used to continually read data. */
196  private def sourceSchema(): SourceInfo = {
197    providingClass.newInstance() match {
198      case s: StreamSourceProvider =>
199        val (name, schema) = s.sourceSchema(
200          sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
201        SourceInfo(name, schema, Nil)
202
203      case format: FileFormat =>
204        val path = caseInsensitiveOptions.getOrElse("path", {
205          throw new IllegalArgumentException("'path' is not specified")
206        })
207
208        // Check whether the path exists if it is not a glob pattern.
209        // For glob pattern, we do not check it because the glob pattern might only make sense
210        // once the streaming job starts and some upstream source starts dropping data.
211        val hdfsPath = new Path(path)
212        if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
213          val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
214          if (!fs.exists(hdfsPath)) {
215            throw new AnalysisException(s"Path does not exist: $path")
216          }
217        }
218
219        val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
220        val isTextSource = providingClass == classOf[text.TextFileFormat]
221        // If the schema inference is disabled, only text sources require schema to be specified
222        if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
223          throw new IllegalArgumentException(
224            "Schema must be specified when creating a streaming source DataFrame. " +
225              "If some files already exist in the directory, then depending on the file format " +
226              "you may be able to create a static DataFrame on that directory with " +
227              "'spark.read.load(directory)' and infer schema from it.")
228        }
229        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
230        SourceInfo(
231          s"FileSource[$path]",
232          StructType(dataSchema ++ partitionSchema),
233          partitionSchema.fieldNames)
234
235      case _ =>
236        throw new UnsupportedOperationException(
237          s"Data source $className does not support streamed reading")
238    }
239  }
240
241  /** Returns a source that can be used to continually read data. */
242  def createSource(metadataPath: String): Source = {
243    providingClass.newInstance() match {
244      case s: StreamSourceProvider =>
245        s.createSource(
246          sparkSession.sqlContext,
247          metadataPath,
248          userSpecifiedSchema,
249          className,
250          caseInsensitiveOptions)
251
252      case format: FileFormat =>
253        val path = caseInsensitiveOptions.getOrElse("path", {
254          throw new IllegalArgumentException("'path' is not specified")
255        })
256        new FileStreamSource(
257          sparkSession = sparkSession,
258          path = path,
259          fileFormatClassName = className,
260          schema = sourceInfo.schema,
261          partitionColumns = sourceInfo.partitionColumns,
262          metadataPath = metadataPath,
263          options = caseInsensitiveOptions)
264      case _ =>
265        throw new UnsupportedOperationException(
266          s"Data source $className does not support streamed reading")
267    }
268  }
269
270  /** Returns a sink that can be used to continually write data. */
271  def createSink(outputMode: OutputMode): Sink = {
272    providingClass.newInstance() match {
273      case s: StreamSinkProvider =>
274        s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
275
276      case fileFormat: FileFormat =>
277        val path = caseInsensitiveOptions.getOrElse("path", {
278          throw new IllegalArgumentException("'path' is not specified")
279        })
280        if (outputMode != OutputMode.Append) {
281          throw new AnalysisException(
282            s"Data source $className does not support $outputMode output mode")
283        }
284        new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
285
286      case _ =>
287        throw new UnsupportedOperationException(
288          s"Data source $className does not support streamed writing")
289    }
290  }
291
292  /**
293   * Returns true if there is a single path that has a metadata log indicating which files should
294   * be read.
295   */
296  def hasMetadata(path: Seq[String]): Boolean = {
297    path match {
298      case Seq(singlePath) =>
299        try {
300          val hdfsPath = new Path(singlePath)
301          val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
302          val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
303          val res = fs.exists(metadataPath)
304          res
305        } catch {
306          case NonFatal(e) =>
307            logWarning(s"Error while looking for metadata directory.")
308            false
309        }
310      case _ => false
311    }
312  }
313
314  /**
315   * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
316   * [[DataSource]]
317   *
318   * @param checkFilesExist Whether to confirm that the files exist when generating the
319   *                        non-streaming file based datasource. StructuredStreaming jobs already
320   *                        list file existence, and when generating incremental jobs, the batch
321   *                        is considered as a non-streaming file based data source. Since we know
322   *                        that files already exist, we don't need to check them again.
323   */
324  def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
325    val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
326      // TODO: Throw when too much is given.
327      case (dataSource: SchemaRelationProvider, Some(schema)) =>
328        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
329      case (dataSource: RelationProvider, None) =>
330        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
331      case (_: SchemaRelationProvider, None) =>
332        throw new AnalysisException(s"A schema needs to be specified when using $className.")
333      case (dataSource: RelationProvider, Some(schema)) =>
334        val baseRelation =
335          dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
336        if (baseRelation.schema != schema) {
337          throw new AnalysisException(s"$className does not allow user-specified schemas.")
338        }
339        baseRelation
340
341      // We are reading from the results of a streaming query. Load files from the metadata log
342      // instead of listing them using HDFS APIs.
343      case (format: FileFormat, _)
344          if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
345        val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
346        val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
347        val dataSchema = userSpecifiedSchema.orElse {
348          format.inferSchema(
349            sparkSession,
350            caseInsensitiveOptions,
351            fileCatalog.allFiles())
352        }.getOrElse {
353          throw new AnalysisException(
354            s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
355                "It must be specified manually")
356        }
357
358        HadoopFsRelation(
359          fileCatalog,
360          partitionSchema = fileCatalog.partitionSchema,
361          dataSchema = dataSchema,
362          bucketSpec = None,
363          format,
364          caseInsensitiveOptions)(sparkSession)
365
366      // This is a non-streaming file based datasource.
367      case (format: FileFormat, _) =>
368        val allPaths = caseInsensitiveOptions.get("path") ++ paths
369        val hadoopConf = sparkSession.sessionState.newHadoopConf()
370        val globbedPaths = allPaths.flatMap { path =>
371          val hdfsPath = new Path(path)
372          val fs = hdfsPath.getFileSystem(hadoopConf)
373          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
374          val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
375
376          if (globPath.isEmpty) {
377            throw new AnalysisException(s"Path does not exist: $qualified")
378          }
379          // Sufficient to check head of the globPath seq for non-glob scenario
380          // Don't need to check once again if files exist in streaming mode
381          if (checkFilesExist && !fs.exists(globPath.head)) {
382            throw new AnalysisException(s"Path does not exist: ${globPath.head}")
383          }
384          globPath
385        }.toArray
386
387        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
388
389        val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
390            catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
391          val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
392          new CatalogFileIndex(
393            sparkSession,
394            catalogTable.get,
395            catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
396        } else {
397          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
398        }
399
400        HadoopFsRelation(
401          fileCatalog,
402          partitionSchema = partitionSchema,
403          dataSchema = dataSchema.asNullable,
404          bucketSpec = bucketSpec,
405          format,
406          caseInsensitiveOptions)(sparkSession)
407
408      case _ =>
409        throw new AnalysisException(
410          s"$className is not a valid Spark SQL Data Source.")
411    }
412
413    relation
414  }
415
416  /**
417   * Writes the given [[DataFrame]] out in this [[FileFormat]].
418   */
419  private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
420    // Don't glob path for the write path.  The contracts here are:
421    //  1. Only one output path can be specified on the write path;
422    //  2. Output path must be a legal HDFS style file system path;
423    //  3. It's OK that the output path doesn't exist yet;
424    val allPaths = paths ++ caseInsensitiveOptions.get("path")
425    val outputPath = if (allPaths.length == 1) {
426      val path = new Path(allPaths.head)
427      val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
428      path.makeQualified(fs.getUri, fs.getWorkingDirectory)
429    } else {
430      throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
431        s"got: ${allPaths.mkString(", ")}")
432    }
433
434    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
435    PartitioningUtils.validatePartitionColumn(
436      data.schema, partitionColumns, caseSensitive)
437
438    // If we are appending to a table that already exists, make sure the partitioning matches
439    // up.  If we fail to load the table for whatever reason, ignore the check.
440    if (mode == SaveMode.Append) {
441      val existingPartitionColumns = Try {
442        getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
443      }.getOrElse(Seq.empty[String])
444      // TODO: Case sensitivity.
445      val sameColumns =
446        existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
447      if (existingPartitionColumns.nonEmpty && !sameColumns) {
448        throw new AnalysisException(
449          s"""Requested partitioning does not match existing partitioning.
450             |Existing partitioning columns:
451             |  ${existingPartitionColumns.mkString(", ")}
452             |Requested partitioning columns:
453             |  ${partitionColumns.mkString(", ")}
454             |""".stripMargin)
455      }
456    }
457
458    // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
459    // not need to have the query as child, to avoid to analyze an optimized query,
460    // because InsertIntoHadoopFsRelationCommand will be optimized first.
461    val columns = partitionColumns.map { name =>
462      val plan = data.logicalPlan
463      plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
464        throw new AnalysisException(
465          s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
466      }.asInstanceOf[Attribute]
467    }
468    // For partitioned relation r, r.schema's column ordering can be different from the column
469    // ordering of data.logicalPlan (partition columns are all moved after data column).  This
470    // will be adjusted within InsertIntoHadoopFsRelation.
471    val plan =
472      InsertIntoHadoopFsRelationCommand(
473        outputPath = outputPath,
474        staticPartitionKeys = Map.empty,
475        customPartitionLocations = Map.empty,
476        partitionColumns = columns,
477        bucketSpec = bucketSpec,
478        fileFormat = format,
479        refreshFunction = _ => Unit, // No existing table needs to be refreshed.
480        options = options,
481        query = data.logicalPlan,
482        mode = mode,
483        catalogTable = catalogTable)
484    sparkSession.sessionState.executePlan(plan).toRdd
485  }
486
487  /**
488   * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
489   * the following reading.
490   */
491  def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
492    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
493      throw new AnalysisException("Cannot save interval data type into external storage.")
494    }
495
496    providingClass.newInstance() match {
497      case dataSource: CreatableRelationProvider =>
498        dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
499      case format: FileFormat =>
500        writeInFileFormat(format, mode, data)
501        // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
502        copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
503      case _ =>
504        sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
505    }
506  }
507
508  /**
509   * Writes the given [[DataFrame]] out to this [[DataSource]].
510   */
511  def write(mode: SaveMode, data: DataFrame): Unit = {
512    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
513      throw new AnalysisException("Cannot save interval data type into external storage.")
514    }
515
516    providingClass.newInstance() match {
517      case dataSource: CreatableRelationProvider =>
518        dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
519      case format: FileFormat =>
520        writeInFileFormat(format, mode, data)
521      case _ =>
522        sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
523    }
524  }
525}
526
527object DataSource {
528
529  /** A map to maintain backward compatibility in case we move data sources around. */
530  private val backwardCompatibilityMap: Map[String, String] = {
531    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
532    val json = classOf[JsonFileFormat].getCanonicalName
533    val parquet = classOf[ParquetFileFormat].getCanonicalName
534    val csv = classOf[CSVFileFormat].getCanonicalName
535    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
536    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
537
538    Map(
539      "org.apache.spark.sql.jdbc" -> jdbc,
540      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
541      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
542      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
543      "org.apache.spark.sql.json" -> json,
544      "org.apache.spark.sql.json.DefaultSource" -> json,
545      "org.apache.spark.sql.execution.datasources.json" -> json,
546      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
547      "org.apache.spark.sql.parquet" -> parquet,
548      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
549      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
550      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
551      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
552      "org.apache.spark.sql.hive.orc" -> orc,
553      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
554      "org.apache.spark.ml.source.libsvm" -> libsvm,
555      "com.databricks.spark.csv" -> csv
556    )
557  }
558
559  /**
560   * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
561   */
562  private val spark2RemovedClasses = Set(
563    "org.apache.spark.sql.DataFrame",
564    "org.apache.spark.sql.sources.HadoopFsRelationProvider",
565    "org.apache.spark.Logging")
566
567  /** Given a provider name, look up the data source class definition. */
568  def lookupDataSource(provider: String): Class[_] = {
569    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
570    val provider2 = s"$provider1.DefaultSource"
571    val loader = Utils.getContextOrSparkClassLoader
572    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
573
574    try {
575      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
576        // the provider format did not match any given registered aliases
577        case Nil =>
578          try {
579            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
580              case Success(dataSource) =>
581                // Found the data source using fully qualified path
582                dataSource
583              case Failure(error) =>
584                if (provider1.toLowerCase == "orc" ||
585                  provider1.startsWith("org.apache.spark.sql.hive.orc")) {
586                  throw new AnalysisException(
587                    "The ORC data source must be used with Hive support enabled")
588                } else if (provider1.toLowerCase == "avro" ||
589                  provider1 == "com.databricks.spark.avro") {
590                  throw new AnalysisException(
591                    s"Failed to find data source: ${provider1.toLowerCase}. Please find an Avro " +
592                      "package at http://spark.apache.org/third-party-projects.html")
593                } else {
594                  throw new ClassNotFoundException(
595                    s"Failed to find data source: $provider1. Please find packages at " +
596                      "http://spark.apache.org/third-party-projects.html",
597                    error)
598                }
599            }
600          } catch {
601            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
602              // NoClassDefFoundError's class name uses "/" rather than "." for packages
603              val className = e.getMessage.replaceAll("/", ".")
604              if (spark2RemovedClasses.contains(className)) {
605                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
606                  "Please check if your library is compatible with Spark 2.0", e)
607              } else {
608                throw e
609              }
610          }
611        case head :: Nil =>
612          // there is exactly one registered alias
613          head.getClass
614        case sources =>
615          // There are multiple registered aliases for the input
616          sys.error(s"Multiple sources found for $provider1 " +
617            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
618            "please specify the fully qualified class name.")
619      }
620    } catch {
621      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
622        // NoClassDefFoundError's class name uses "/" rather than "." for packages
623        val className = e.getCause.getMessage.replaceAll("/", ".")
624        if (spark2RemovedClasses.contains(className)) {
625          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
626            "Please remove the incompatible library from classpath or upgrade it. " +
627            s"Error: ${e.getMessage}", e)
628        } else {
629          throw e
630        }
631    }
632  }
633
634  /**
635   * When creating a data source table, the `path` option has a special meaning: the table location.
636   * This method extracts the `path` option and treat it as table location to build a
637   * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
638   */
639  def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
640    val path = new CaseInsensitiveMap(options).get("path")
641    val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
642    CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
643  }
644}
645