1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.catalyst.catalog
19
20import javax.annotation.concurrent.GuardedBy
21
22import scala.collection.mutable
23
24import org.apache.hadoop.conf.Configuration
25import org.apache.hadoop.fs.Path
26
27import org.apache.spark.internal.Logging
28import org.apache.spark.sql.AnalysisException
29import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
30import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
31import org.apache.spark.sql.catalyst.analysis._
32import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
33import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
34import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
35import org.apache.spark.sql.catalyst.util.StringUtils
36
37object SessionCatalog {
38  val DEFAULT_DATABASE = "default"
39}
40
41/**
42 * An internal catalog that is used by a Spark Session. This internal catalog serves as a
43 * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
44 * tables and functions of the Spark Session that it belongs to.
45 *
46 * This class must be thread-safe.
47 */
48class SessionCatalog(
49    externalCatalog: ExternalCatalog,
50    globalTempViewManager: GlobalTempViewManager,
51    functionResourceLoader: FunctionResourceLoader,
52    functionRegistry: FunctionRegistry,
53    conf: CatalystConf,
54    hadoopConf: Configuration) extends Logging {
55  import SessionCatalog._
56  import CatalogTypes.TablePartitionSpec
57
58  // For testing only.
59  def this(
60      externalCatalog: ExternalCatalog,
61      functionRegistry: FunctionRegistry,
62      conf: CatalystConf) {
63    this(
64      externalCatalog,
65      new GlobalTempViewManager("global_temp"),
66      DummyFunctionResourceLoader,
67      functionRegistry,
68      conf,
69      new Configuration())
70  }
71
72  // For testing only.
73  def this(externalCatalog: ExternalCatalog) {
74    this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true))
75  }
76
77  /** List of temporary tables, mapping from table name to their logical plan. */
78  @GuardedBy("this")
79  protected val tempTables = new mutable.HashMap[String, LogicalPlan]
80
81  // Note: we track current database here because certain operations do not explicitly
82  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
83  // check whether the temporary table or function exists, then, if not, operate on
84  // the corresponding item in the current database.
85  @GuardedBy("this")
86  protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
87
88  /**
89   * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
90   * i.e. if this name only contains characters, numbers, and _.
91   *
92   * This method is intended to have the same behavior of
93   * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
94   */
95  private def validateName(name: String): Unit = {
96    val validNameFormat = "([\\w_]+)".r
97    if (!validNameFormat.pattern.matcher(name).matches()) {
98      throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " +
99        "Valid names only contain alphabet characters, numbers and _.")
100    }
101  }
102
103  /**
104   * Format table name, taking into account case sensitivity.
105   */
106  protected[this] def formatTableName(name: String): String = {
107    if (conf.caseSensitiveAnalysis) name else name.toLowerCase
108  }
109
110  /**
111   * Format database name, taking into account case sensitivity.
112   */
113  protected[this] def formatDatabaseName(name: String): String = {
114    if (conf.caseSensitiveAnalysis) name else name.toLowerCase
115  }
116
117  /**
118   * This method is used to make the given path qualified before we
119   * store this path in the underlying external catalog. So, when a path
120   * does not contain a scheme, this path will not be changed after the default
121   * FileSystem is changed.
122   */
123  private def makeQualifiedPath(path: String): Path = {
124    val hadoopPath = new Path(path)
125    val fs = hadoopPath.getFileSystem(hadoopConf)
126    fs.makeQualified(hadoopPath)
127  }
128
129  private def requireDbExists(db: String): Unit = {
130    if (!databaseExists(db)) {
131      throw new NoSuchDatabaseException(db)
132    }
133  }
134
135  private def requireTableExists(name: TableIdentifier): Unit = {
136    if (!tableExists(name)) {
137      val db = name.database.getOrElse(currentDb)
138      throw new NoSuchTableException(db = db, table = name.table)
139    }
140  }
141
142  private def requireTableNotExists(name: TableIdentifier): Unit = {
143    if (tableExists(name)) {
144      val db = name.database.getOrElse(currentDb)
145      throw new TableAlreadyExistsException(db = db, table = name.table)
146    }
147  }
148  // ----------------------------------------------------------------------------
149  // Databases
150  // ----------------------------------------------------------------------------
151  // All methods in this category interact directly with the underlying catalog.
152  // ----------------------------------------------------------------------------
153
154  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
155    val dbName = formatDatabaseName(dbDefinition.name)
156    if (dbName == globalTempViewManager.database) {
157      throw new AnalysisException(
158        s"${globalTempViewManager.database} is a system preserved database, " +
159          "you cannot create a database with this name.")
160    }
161    validateName(dbName)
162    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
163    externalCatalog.createDatabase(
164      dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
165      ignoreIfExists)
166  }
167
168  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
169    val dbName = formatDatabaseName(db)
170    if (dbName == DEFAULT_DATABASE) {
171      throw new AnalysisException(s"Can not drop default database")
172    }
173    externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
174  }
175
176  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
177    val dbName = formatDatabaseName(dbDefinition.name)
178    requireDbExists(dbName)
179    externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
180  }
181
182  def getDatabaseMetadata(db: String): CatalogDatabase = {
183    val dbName = formatDatabaseName(db)
184    requireDbExists(dbName)
185    externalCatalog.getDatabase(dbName)
186  }
187
188  def databaseExists(db: String): Boolean = {
189    val dbName = formatDatabaseName(db)
190    externalCatalog.databaseExists(dbName)
191  }
192
193  def listDatabases(): Seq[String] = {
194    externalCatalog.listDatabases()
195  }
196
197  def listDatabases(pattern: String): Seq[String] = {
198    externalCatalog.listDatabases(pattern)
199  }
200
201  def getCurrentDatabase: String = synchronized { currentDb }
202
203  def setCurrentDatabase(db: String): Unit = {
204    val dbName = formatDatabaseName(db)
205    if (dbName == globalTempViewManager.database) {
206      throw new AnalysisException(
207        s"${globalTempViewManager.database} is a system preserved database, " +
208          "you cannot use it as current database. To access global temporary views, you should " +
209          "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " +
210          s"${globalTempViewManager.database}.viewName.")
211    }
212    requireDbExists(dbName)
213    synchronized { currentDb = dbName }
214  }
215
216  /**
217   * Get the path for creating a non-default database when database location is not provided
218   * by users.
219   */
220  def getDefaultDBPath(db: String): String = {
221    val database = formatDatabaseName(db)
222    new Path(new Path(conf.warehousePath), database + ".db").toString
223  }
224
225  // ----------------------------------------------------------------------------
226  // Tables
227  // ----------------------------------------------------------------------------
228  // There are two kinds of tables, temporary tables and metastore tables.
229  // Temporary tables are isolated across sessions and do not belong to any
230  // particular database. Metastore tables can be used across multiple
231  // sessions as their metadata is persisted in the underlying catalog.
232  // ----------------------------------------------------------------------------
233
234  // ----------------------------------------------------
235  // | Methods that interact with metastore tables only |
236  // ----------------------------------------------------
237
238  /**
239   * Create a metastore table in the database specified in `tableDefinition`.
240   * If no such database is specified, create it in the current database.
241   */
242  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
243    val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
244    val table = formatTableName(tableDefinition.identifier.table)
245    validateName(table)
246    val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
247    requireDbExists(db)
248    externalCatalog.createTable(newTableDefinition, ignoreIfExists)
249  }
250
251  /**
252   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
253   *
254   * If no database is specified in `tableDefinition`, assume the table is in the
255   * current database.
256   *
257   * Note: If the underlying implementation does not support altering a certain field,
258   * this becomes a no-op.
259   */
260  def alterTable(tableDefinition: CatalogTable): Unit = {
261    val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
262    val table = formatTableName(tableDefinition.identifier.table)
263    val tableIdentifier = TableIdentifier(table, Some(db))
264    val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier)
265    requireDbExists(db)
266    requireTableExists(tableIdentifier)
267    externalCatalog.alterTable(newTableDefinition)
268  }
269
270  /**
271   * Return whether a table/view with the specified name exists. If no database is specified, check
272   * with current database.
273   */
274  def tableExists(name: TableIdentifier): Boolean = synchronized {
275    val db = formatDatabaseName(name.database.getOrElse(currentDb))
276    val table = formatTableName(name.table)
277    externalCatalog.tableExists(db, table)
278  }
279
280  /**
281   * Retrieve the metadata of an existing permanent table/view. If no database is specified,
282   * assume the table/view is in the current database. If the specified table/view is not found
283   * in the database then a [[NoSuchTableException]] is thrown.
284   */
285  def getTableMetadata(name: TableIdentifier): CatalogTable = {
286    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
287    val table = formatTableName(name.table)
288    requireDbExists(db)
289    requireTableExists(TableIdentifier(table, Some(db)))
290    externalCatalog.getTable(db, table)
291  }
292
293  /**
294   * Retrieve the metadata of an existing metastore table.
295   * If no database is specified, assume the table is in the current database.
296   * If the specified table is not found in the database then return None if it doesn't exist.
297   */
298  def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = {
299    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
300    val table = formatTableName(name.table)
301    requireDbExists(db)
302    externalCatalog.getTableOption(db, table)
303  }
304
305  /**
306   * Load files stored in given path into an existing metastore table.
307   * If no database is specified, assume the table is in the current database.
308   * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
309   */
310  def loadTable(
311      name: TableIdentifier,
312      loadPath: String,
313      isOverwrite: Boolean,
314      holdDDLTime: Boolean): Unit = {
315    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
316    val table = formatTableName(name.table)
317    requireDbExists(db)
318    requireTableExists(TableIdentifier(table, Some(db)))
319    externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
320  }
321
322  /**
323   * Load files stored in given path into the partition of an existing metastore table.
324   * If no database is specified, assume the table is in the current database.
325   * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
326   */
327  def loadPartition(
328      name: TableIdentifier,
329      loadPath: String,
330      spec: TablePartitionSpec,
331      isOverwrite: Boolean,
332      holdDDLTime: Boolean,
333      inheritTableSpecs: Boolean): Unit = {
334    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
335    val table = formatTableName(name.table)
336    requireDbExists(db)
337    requireTableExists(TableIdentifier(table, Some(db)))
338    requireNonEmptyValueInPartitionSpec(Seq(spec))
339    externalCatalog.loadPartition(
340      db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs)
341  }
342
343  def defaultTablePath(tableIdent: TableIdentifier): String = {
344    val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
345    val dbLocation = getDatabaseMetadata(dbName).locationUri
346
347    new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
348  }
349
350  // ----------------------------------------------
351  // | Methods that interact with temp views only |
352  // ----------------------------------------------
353
354  /**
355   * Create a local temporary view.
356   */
357  def createTempView(
358      name: String,
359      tableDefinition: LogicalPlan,
360      overrideIfExists: Boolean): Unit = synchronized {
361    val table = formatTableName(name)
362    if (tempTables.contains(table) && !overrideIfExists) {
363      throw new TempTableAlreadyExistsException(name)
364    }
365    tempTables.put(table, tableDefinition)
366  }
367
368  /**
369   * Create a global temporary view.
370   */
371  def createGlobalTempView(
372      name: String,
373      viewDefinition: LogicalPlan,
374      overrideIfExists: Boolean): Unit = {
375    globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
376  }
377
378  /**
379   * Alter the definition of a local/global temp view matching the given name, returns true if a
380   * temp view is matched and altered, false otherwise.
381   */
382  def alterTempViewDefinition(
383      name: TableIdentifier,
384      viewDefinition: LogicalPlan): Boolean = synchronized {
385    val viewName = formatTableName(name.table)
386    if (name.database.isEmpty) {
387      if (tempTables.contains(viewName)) {
388        createTempView(viewName, viewDefinition, overrideIfExists = true)
389        true
390      } else {
391        false
392      }
393    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
394      globalTempViewManager.update(viewName, viewDefinition)
395    } else {
396      false
397    }
398  }
399
400  /**
401   * Return a local temporary view exactly as it was stored.
402   */
403  def getTempView(name: String): Option[LogicalPlan] = synchronized {
404    tempTables.get(formatTableName(name))
405  }
406
407  /**
408   * Return a global temporary view exactly as it was stored.
409   */
410  def getGlobalTempView(name: String): Option[LogicalPlan] = {
411    globalTempViewManager.get(formatTableName(name))
412  }
413
414  /**
415   * Drop a local temporary view.
416   *
417   * Returns true if this view is dropped successfully, false otherwise.
418   */
419  def dropTempView(name: String): Boolean = synchronized {
420    tempTables.remove(formatTableName(name)).isDefined
421  }
422
423  /**
424   * Drop a global temporary view.
425   *
426   * Returns true if this view is dropped successfully, false otherwise.
427   */
428  def dropGlobalTempView(name: String): Boolean = {
429    globalTempViewManager.remove(formatTableName(name))
430  }
431
432  // -------------------------------------------------------------
433  // | Methods that interact with temporary and metastore tables |
434  // -------------------------------------------------------------
435
436  /**
437   * Retrieve the metadata of an existing temporary view or permanent table/view.
438   *
439   * If a database is specified in `name`, this will return the metadata of table/view in that
440   * database.
441   * If no database is specified, this will first attempt to get the metadata of a temporary view
442   * with the same name, then, if that does not exist, return the metadata of table/view in the
443   * current database.
444   */
445  def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
446    val table = formatTableName(name.table)
447    if (name.database.isEmpty) {
448      getTempView(table).map { plan =>
449        CatalogTable(
450          identifier = TableIdentifier(table),
451          tableType = CatalogTableType.VIEW,
452          storage = CatalogStorageFormat.empty,
453          schema = plan.output.toStructType)
454      }.getOrElse(getTableMetadata(name))
455    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
456      globalTempViewManager.get(table).map { plan =>
457        CatalogTable(
458          identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
459          tableType = CatalogTableType.VIEW,
460          storage = CatalogStorageFormat.empty,
461          schema = plan.output.toStructType)
462      }.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
463    } else {
464      getTableMetadata(name)
465    }
466  }
467
468  /**
469   * Rename a table.
470   *
471   * If a database is specified in `oldName`, this will rename the table in that database.
472   * If no database is specified, this will first attempt to rename a temporary table with
473   * the same name, then, if that does not exist, rename the table in the current database.
474   *
475   * This assumes the database specified in `newName` matches the one in `oldName`.
476   */
477  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
478    val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
479    newName.database.map(formatDatabaseName).foreach { newDb =>
480      if (db != newDb) {
481        throw new AnalysisException(
482          s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
483      }
484    }
485
486    val oldTableName = formatTableName(oldName.table)
487    val newTableName = formatTableName(newName.table)
488    if (db == globalTempViewManager.database) {
489      globalTempViewManager.rename(oldTableName, newTableName)
490    } else {
491      requireDbExists(db)
492      if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
493        requireTableExists(TableIdentifier(oldTableName, Some(db)))
494        requireTableNotExists(TableIdentifier(newTableName, Some(db)))
495        validateName(newTableName)
496        externalCatalog.renameTable(db, oldTableName, newTableName)
497      } else {
498        if (newName.database.isDefined) {
499          throw new AnalysisException(
500            s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': cannot specify database " +
501              s"name '${newName.database.get}' in the destination table")
502        }
503        if (tempTables.contains(newTableName)) {
504          throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
505            "destination table already exists")
506        }
507        val table = tempTables(oldTableName)
508        tempTables.remove(oldTableName)
509        tempTables.put(newTableName, table)
510      }
511    }
512  }
513
514  /**
515   * Drop a table.
516   *
517   * If a database is specified in `name`, this will drop the table from that database.
518   * If no database is specified, this will first attempt to drop a temporary table with
519   * the same name, then, if that does not exist, drop the table from the current database.
520   */
521  def dropTable(
522      name: TableIdentifier,
523      ignoreIfNotExists: Boolean,
524      purge: Boolean): Unit = synchronized {
525    val db = formatDatabaseName(name.database.getOrElse(currentDb))
526    val table = formatTableName(name.table)
527    if (db == globalTempViewManager.database) {
528      val viewExists = globalTempViewManager.remove(table)
529      if (!viewExists && !ignoreIfNotExists) {
530        throw new NoSuchTableException(globalTempViewManager.database, table)
531      }
532    } else {
533      if (name.database.isDefined || !tempTables.contains(table)) {
534        requireDbExists(db)
535        // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
536        // Instead, log it as an error message.
537        if (tableExists(TableIdentifier(table, Option(db)))) {
538          externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
539        } else if (!ignoreIfNotExists) {
540          throw new NoSuchTableException(db = db, table = table)
541        }
542      } else {
543        tempTables.remove(table)
544      }
545    }
546  }
547
548  /**
549   * Return a [[LogicalPlan]] that represents the given table or view.
550   *
551   * If a database is specified in `name`, this will return the table/view from that database.
552   * If no database is specified, this will first attempt to return a temporary table/view with
553   * the same name, then, if that does not exist, return the table/view from the current database.
554   *
555   * Note that, the global temp view database is also valid here, this will return the global temp
556   * view matching the given name.
557   *
558   * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
559   * track the name of the view.
560   */
561  def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
562    synchronized {
563      val db = formatDatabaseName(name.database.getOrElse(currentDb))
564      val table = formatTableName(name.table)
565      val relationAlias = alias.getOrElse(table)
566      if (db == globalTempViewManager.database) {
567        globalTempViewManager.get(table).map { viewDef =>
568          SubqueryAlias(relationAlias, viewDef, Some(name))
569        }.getOrElse(throw new NoSuchTableException(db, table))
570      } else if (name.database.isDefined || !tempTables.contains(table)) {
571        val metadata = externalCatalog.getTable(db, table)
572        val view = Option(metadata.tableType).collect {
573          case CatalogTableType.VIEW => name
574        }
575        SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)
576      } else {
577        SubqueryAlias(relationAlias, tempTables(table), Option(name))
578      }
579    }
580  }
581
582  /**
583   * Return whether a table with the specified name is a temporary table.
584   *
585   * Note: The temporary table cache is checked only when database is not
586   * explicitly specified.
587   */
588  def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
589    val table = formatTableName(name.table)
590    if (name.database.isEmpty) {
591      tempTables.contains(table)
592    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
593      globalTempViewManager.get(table).isDefined
594    } else {
595      false
596    }
597  }
598
599  /**
600   * List all tables in the specified database, including local temporary tables.
601   *
602   * Note that, if the specified database is global temporary view database, we will list global
603   * temporary views.
604   */
605  def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
606
607  /**
608   * List all matching tables in the specified database, including local temporary tables.
609   *
610   * Note that, if the specified database is global temporary view database, we will list global
611   * temporary views.
612   */
613  def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
614    val dbName = formatDatabaseName(db)
615    val dbTables = if (dbName == globalTempViewManager.database) {
616      globalTempViewManager.listViewNames(pattern).map { name =>
617        TableIdentifier(name, Some(globalTempViewManager.database))
618      }
619    } else {
620      requireDbExists(dbName)
621      externalCatalog.listTables(dbName, pattern).map { name =>
622        TableIdentifier(name, Some(dbName))
623      }
624    }
625    val localTempViews = synchronized {
626      StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
627        TableIdentifier(name)
628      }
629    }
630    dbTables ++ localTempViews
631  }
632
633  /**
634   * Refresh the cache entry for a metastore table, if any.
635   */
636  def refreshTable(name: TableIdentifier): Unit = synchronized {
637    // Go through temporary tables and invalidate them.
638    // If the database is defined, this is definitely not a temp table.
639    // If the database is not defined, there is a good chance this is a temp table.
640    if (name.database.isEmpty) {
641      tempTables.get(formatTableName(name.table)).foreach(_.refresh())
642    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
643      globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
644    }
645  }
646
647  /**
648   * Drop all existing temporary tables.
649   * For testing only.
650   */
651  def clearTempTables(): Unit = synchronized {
652    tempTables.clear()
653  }
654
655  // ----------------------------------------------------------------------------
656  // Partitions
657  // ----------------------------------------------------------------------------
658  // All methods in this category interact directly with the underlying catalog.
659  // These methods are concerned with only metastore tables.
660  // ----------------------------------------------------------------------------
661
662  // TODO: We need to figure out how these methods interact with our data source
663  // tables. For such tables, we do not store values of partitioning columns in
664  // the metastore. For now, partition values of a data source table will be
665  // automatically discovered when we load the table.
666
667  /**
668   * Create partitions in an existing table, assuming it exists.
669   * If no database is specified, assume the table is in the current database.
670   */
671  def createPartitions(
672      tableName: TableIdentifier,
673      parts: Seq[CatalogTablePartition],
674      ignoreIfExists: Boolean): Unit = {
675    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
676    val table = formatTableName(tableName.table)
677    requireDbExists(db)
678    requireTableExists(TableIdentifier(table, Option(db)))
679    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
680    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
681    externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
682  }
683
684  /**
685   * Drop partitions from a table, assuming they exist.
686   * If no database is specified, assume the table is in the current database.
687   */
688  def dropPartitions(
689      tableName: TableIdentifier,
690      specs: Seq[TablePartitionSpec],
691      ignoreIfNotExists: Boolean,
692      purge: Boolean,
693      retainData: Boolean): Unit = {
694    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
695    val table = formatTableName(tableName.table)
696    requireDbExists(db)
697    requireTableExists(TableIdentifier(table, Option(db)))
698    requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
699    requireNonEmptyValueInPartitionSpec(specs)
700    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
701  }
702
703  /**
704   * Override the specs of one or many existing table partitions, assuming they exist.
705   *
706   * This assumes index i of `specs` corresponds to index i of `newSpecs`.
707   * If no database is specified, assume the table is in the current database.
708   */
709  def renamePartitions(
710      tableName: TableIdentifier,
711      specs: Seq[TablePartitionSpec],
712      newSpecs: Seq[TablePartitionSpec]): Unit = {
713    val tableMetadata = getTableMetadata(tableName)
714    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
715    val table = formatTableName(tableName.table)
716    requireDbExists(db)
717    requireTableExists(TableIdentifier(table, Option(db)))
718    requireExactMatchedPartitionSpec(specs, tableMetadata)
719    requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
720    requireNonEmptyValueInPartitionSpec(specs)
721    requireNonEmptyValueInPartitionSpec(newSpecs)
722    externalCatalog.renamePartitions(db, table, specs, newSpecs)
723  }
724
725  /**
726   * Alter one or many table partitions whose specs that match those specified in `parts`,
727   * assuming the partitions exist.
728   *
729   * If no database is specified, assume the table is in the current database.
730   *
731   * Note: If the underlying implementation does not support altering a certain field,
732   * this becomes a no-op.
733   */
734  def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
735    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
736    val table = formatTableName(tableName.table)
737    requireDbExists(db)
738    requireTableExists(TableIdentifier(table, Option(db)))
739    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
740    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
741    externalCatalog.alterPartitions(db, table, parts)
742  }
743
744  /**
745   * Retrieve the metadata of a table partition, assuming it exists.
746   * If no database is specified, assume the table is in the current database.
747   */
748  def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
749    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
750    val table = formatTableName(tableName.table)
751    requireDbExists(db)
752    requireTableExists(TableIdentifier(table, Option(db)))
753    requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
754    requireNonEmptyValueInPartitionSpec(Seq(spec))
755    externalCatalog.getPartition(db, table, spec)
756  }
757
758  /**
759   * List the names of all partitions that belong to the specified table, assuming it exists.
760   *
761   * A partial partition spec may optionally be provided to filter the partitions returned.
762   * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
763   * then a partial spec of (a='1') will return the first two only.
764   */
765  def listPartitionNames(
766      tableName: TableIdentifier,
767      partialSpec: Option[TablePartitionSpec] = None): Seq[String] = {
768    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
769    val table = formatTableName(tableName.table)
770    requireDbExists(db)
771    requireTableExists(TableIdentifier(table, Option(db)))
772    partialSpec.foreach { spec =>
773      requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
774      requireNonEmptyValueInPartitionSpec(Seq(spec))
775    }
776    externalCatalog.listPartitionNames(db, table, partialSpec)
777  }
778
779  /**
780   * List the metadata of all partitions that belong to the specified table, assuming it exists.
781   *
782   * A partial partition spec may optionally be provided to filter the partitions returned.
783   * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
784   * then a partial spec of (a='1') will return the first two only.
785   */
786  def listPartitions(
787      tableName: TableIdentifier,
788      partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
789    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
790    val table = formatTableName(tableName.table)
791    requireDbExists(db)
792    requireTableExists(TableIdentifier(table, Option(db)))
793    partialSpec.foreach { spec =>
794      requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
795      requireNonEmptyValueInPartitionSpec(Seq(spec))
796    }
797    externalCatalog.listPartitions(db, table, partialSpec)
798  }
799
800  /**
801   * List the metadata of partitions that belong to the specified table, assuming it exists, that
802   * satisfy the given partition-pruning predicate expressions.
803   */
804  def listPartitionsByFilter(
805      tableName: TableIdentifier,
806      predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
807    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
808    val table = formatTableName(tableName.table)
809    requireDbExists(db)
810    requireTableExists(TableIdentifier(table, Option(db)))
811    externalCatalog.listPartitionsByFilter(db, table, predicates)
812  }
813
814  /**
815   * Verify if the input partition spec has any empty value.
816   */
817  private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
818    specs.foreach { s =>
819      if (s.values.exists(_.isEmpty)) {
820        val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
821        throw new AnalysisException(
822          s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
823      }
824    }
825  }
826
827  /**
828   * Verify if the input partition spec exactly matches the existing defined partition spec
829   * The columns must be the same but the orders could be different.
830   */
831  private def requireExactMatchedPartitionSpec(
832      specs: Seq[TablePartitionSpec],
833      table: CatalogTable): Unit = {
834    val defined = table.partitionColumnNames.sorted
835    specs.foreach { s =>
836      if (s.keys.toSeq.sorted != defined) {
837        throw new AnalysisException(
838          s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
839            s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " +
840            s"table '${table.identifier}'")
841      }
842    }
843  }
844
845  /**
846   * Verify if the input partition spec partially matches the existing defined partition spec
847   * That is, the columns of partition spec should be part of the defined partition spec.
848   */
849  private def requirePartialMatchedPartitionSpec(
850      specs: Seq[TablePartitionSpec],
851      table: CatalogTable): Unit = {
852    val defined = table.partitionColumnNames
853    specs.foreach { s =>
854      if (!s.keys.forall(defined.contains)) {
855        throw new AnalysisException(
856          s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " +
857            s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " +
858            s"in table '${table.identifier}'")
859      }
860    }
861  }
862
863  // ----------------------------------------------------------------------------
864  // Functions
865  // ----------------------------------------------------------------------------
866  // There are two kinds of functions, temporary functions and metastore
867  // functions (permanent UDFs). Temporary functions are isolated across
868  // sessions. Metastore functions can be used across multiple sessions as
869  // their metadata is persisted in the underlying catalog.
870  // ----------------------------------------------------------------------------
871
872  // -------------------------------------------------------
873  // | Methods that interact with metastore functions only |
874  // -------------------------------------------------------
875
876  /**
877   * Create a metastore function in the database specified in `funcDefinition`.
878   * If no such database is specified, create it in the current database.
879   */
880  def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
881    val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
882    requireDbExists(db)
883    val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
884    val newFuncDefinition = funcDefinition.copy(identifier = identifier)
885    if (!functionExists(identifier)) {
886      externalCatalog.createFunction(db, newFuncDefinition)
887    } else if (!ignoreIfExists) {
888      throw new FunctionAlreadyExistsException(db = db, func = identifier.toString)
889    }
890  }
891
892  /**
893   * Drop a metastore function.
894   * If no database is specified, assume the function is in the current database.
895   */
896  def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
897    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
898    requireDbExists(db)
899    val identifier = name.copy(database = Some(db))
900    if (functionExists(identifier)) {
901      // TODO: registry should just take in FunctionIdentifier for type safety
902      if (functionRegistry.functionExists(identifier.unquotedString)) {
903        // If we have loaded this function into the FunctionRegistry,
904        // also drop it from there.
905        // For a permanent function, because we loaded it to the FunctionRegistry
906        // when it's first used, we also need to drop it from the FunctionRegistry.
907        functionRegistry.dropFunction(identifier.unquotedString)
908      }
909      externalCatalog.dropFunction(db, name.funcName)
910    } else if (!ignoreIfNotExists) {
911      throw new NoSuchFunctionException(db = db, func = identifier.toString)
912    }
913  }
914
915  /**
916   * Retrieve the metadata of a metastore function.
917   *
918   * If a database is specified in `name`, this will return the function in that database.
919   * If no database is specified, this will return the function in the current database.
920   */
921  def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
922    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
923    requireDbExists(db)
924    externalCatalog.getFunction(db, name.funcName)
925  }
926
927  /**
928   * Check if the specified function exists.
929   */
930  def functionExists(name: FunctionIdentifier): Boolean = {
931    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
932    requireDbExists(db)
933    functionRegistry.functionExists(name.unquotedString) ||
934      externalCatalog.functionExists(db, name.funcName)
935  }
936
937  // ----------------------------------------------------------------
938  // | Methods that interact with temporary and metastore functions |
939  // ----------------------------------------------------------------
940
941  /**
942   * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
943   *
944   * This performs reflection to decide what type of [[Expression]] to return in the builder.
945   */
946  def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
947    // TODO: at least support UDAFs here
948    throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
949  }
950
951  /**
952   * Loads resources such as JARs and Files for a function. Every resource is represented
953   * by a tuple (resource type, resource uri).
954   */
955  def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
956    resources.foreach(functionResourceLoader.loadResource)
957  }
958
959  /**
960   * Create a temporary function.
961   * This assumes no database is specified in `funcDefinition`.
962   */
963  def createTempFunction(
964      name: String,
965      info: ExpressionInfo,
966      funcDefinition: FunctionBuilder,
967      ignoreIfExists: Boolean): Unit = {
968    if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
969      throw new TempFunctionAlreadyExistsException(name)
970    }
971    functionRegistry.registerFunction(name, info, funcDefinition)
972  }
973
974  /**
975   * Drop a temporary function.
976   */
977  def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
978    if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) {
979      throw new NoSuchTempFunctionException(name)
980    }
981  }
982
983  /**
984   * Returns whether it is a temporary function. If not existed, returns false.
985   */
986  def isTemporaryFunction(name: FunctionIdentifier): Boolean = {
987    // copied from HiveSessionCatalog
988    val hiveFunctions = Seq("histogram_numeric")
989
990    // A temporary function is a function that has been registered in functionRegistry
991    // without a database name, and is neither a built-in function nor a Hive function
992    name.database.isEmpty &&
993      functionRegistry.functionExists(name.funcName) &&
994      !FunctionRegistry.builtin.functionExists(name.funcName) &&
995      !hiveFunctions.contains(name.funcName.toLowerCase)
996  }
997
998  protected def failFunctionLookup(name: String): Nothing = {
999    throw new NoSuchFunctionException(db = currentDb, func = name)
1000  }
1001
1002  /**
1003   * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
1004   */
1005  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized {
1006    // TODO: just make function registry take in FunctionIdentifier instead of duplicating this
1007    val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
1008    val qualifiedName = name.copy(database = database)
1009    functionRegistry.lookupFunction(name.funcName)
1010      .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
1011      .getOrElse {
1012        val db = qualifiedName.database.get
1013        requireDbExists(db)
1014        if (externalCatalog.functionExists(db, name.funcName)) {
1015          val metadata = externalCatalog.getFunction(db, name.funcName)
1016          new ExpressionInfo(
1017            metadata.className,
1018            qualifiedName.database.orNull,
1019            qualifiedName.identifier)
1020        } else {
1021          failFunctionLookup(name.funcName)
1022        }
1023      }
1024  }
1025
1026  /**
1027   * Return an [[Expression]] that represents the specified function, assuming it exists.
1028   *
1029   * For a temporary function or a permanent function that has been loaded,
1030   * this method will simply lookup the function through the
1031   * FunctionRegistry and create an expression based on the builder.
1032   *
1033   * For a permanent function that has not been loaded, we will first fetch its metadata
1034   * from the underlying external catalog. Then, we will load all resources associated
1035   * with this function (i.e. jars and files). Finally, we create a function builder
1036   * based on the function class and put the builder into the FunctionRegistry.
1037   * The name of this function in the FunctionRegistry will be `databaseName.functionName`.
1038   */
1039  def lookupFunction(
1040      name: FunctionIdentifier,
1041      children: Seq[Expression]): Expression = synchronized {
1042    // Note: the implementation of this function is a little bit convoluted.
1043    // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
1044    // (built-in, temp, and external).
1045    if (name.database.isEmpty && functionRegistry.functionExists(name.funcName)) {
1046      // This function has been already loaded into the function registry.
1047      return functionRegistry.lookupFunction(name.funcName, children)
1048    }
1049
1050    // If the name itself is not qualified, add the current database to it.
1051    val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
1052    val qualifiedName = name.copy(database = database)
1053
1054    if (functionRegistry.functionExists(qualifiedName.unquotedString)) {
1055      // This function has been already loaded into the function registry.
1056      // Unlike the above block, we find this function by using the qualified name.
1057      return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
1058    }
1059
1060    // The function has not been loaded to the function registry, which means
1061    // that the function is a permanent function (if it actually has been registered
1062    // in the metastore). We need to first put the function in the FunctionRegistry.
1063    // TODO: why not just check whether the function exists first?
1064    val catalogFunction = try {
1065      externalCatalog.getFunction(currentDb, name.funcName)
1066    } catch {
1067      case e: AnalysisException => failFunctionLookup(name.funcName)
1068      case e: NoSuchPermanentFunctionException => failFunctionLookup(name.funcName)
1069    }
1070    loadFunctionResources(catalogFunction.resources)
1071    // Please note that qualifiedName is provided by the user. However,
1072    // catalogFunction.identifier.unquotedString is returned by the underlying
1073    // catalog. So, it is possible that qualifiedName is not exactly the same as
1074    // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
1075    // At here, we preserve the input from the user.
1076    val info = new ExpressionInfo(
1077      catalogFunction.className,
1078      qualifiedName.database.orNull,
1079      qualifiedName.funcName)
1080    val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
1081    createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
1082    // Now, we need to create the Expression.
1083    functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
1084  }
1085
1086  /**
1087   * List all functions in the specified database, including temporary functions. This
1088   * returns the function identifier and the scope in which it was defined (system or user
1089   * defined).
1090   */
1091  def listFunctions(db: String): Seq[(FunctionIdentifier, String)] = listFunctions(db, "*")
1092
1093  /**
1094   * List all matching functions in the specified database, including temporary functions. This
1095   * returns the function identifier and the scope in which it was defined (system or user
1096   * defined).
1097   */
1098  def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
1099    val dbName = formatDatabaseName(db)
1100    requireDbExists(dbName)
1101    val dbFunctions = externalCatalog.listFunctions(dbName, pattern)
1102      .map { f => FunctionIdentifier(f, Some(dbName)) }
1103    val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern)
1104      .map { f => FunctionIdentifier(f) }
1105    val functions = dbFunctions ++ loadedFunctions
1106    functions.map {
1107      case f if FunctionRegistry.functionSet.contains(f.funcName) => (f, "SYSTEM")
1108      case f => (f, "USER")
1109    }
1110  }
1111
1112
1113  // -----------------
1114  // | Other methods |
1115  // -----------------
1116
1117  /**
1118   * Drop all existing databases (except "default"), tables, partitions and functions,
1119   * and set the current database to "default".
1120   *
1121   * This is mainly used for tests.
1122   */
1123  def reset(): Unit = synchronized {
1124    setCurrentDatabase(DEFAULT_DATABASE)
1125    listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db =>
1126      dropDatabase(db, ignoreIfNotExists = false, cascade = true)
1127    }
1128    listTables(DEFAULT_DATABASE).foreach { table =>
1129      dropTable(table, ignoreIfNotExists = false, purge = false)
1130    }
1131    listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func =>
1132      if (func.database.isDefined) {
1133        dropFunction(func, ignoreIfNotExists = false)
1134      } else {
1135        dropTempFunction(func.funcName, ignoreIfNotExists = false)
1136      }
1137    }
1138    tempTables.clear()
1139    globalTempViewManager.clear()
1140    functionRegistry.clear()
1141    // restore built-in functions
1142    FunctionRegistry.builtin.listFunction().foreach { f =>
1143      val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)
1144      val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f)
1145      require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info")
1146      require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder")
1147      functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get)
1148    }
1149  }
1150
1151}
1152