1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.catalyst.catalog 19 20import javax.annotation.concurrent.GuardedBy 21 22import scala.collection.mutable 23 24import org.apache.hadoop.conf.Configuration 25import org.apache.hadoop.fs.Path 26 27import org.apache.spark.internal.Logging 28import org.apache.spark.sql.AnalysisException 29import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} 30import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} 31import org.apache.spark.sql.catalyst.analysis._ 32import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder 33import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} 34import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} 35import org.apache.spark.sql.catalyst.util.StringUtils 36 37object SessionCatalog { 38 val DEFAULT_DATABASE = "default" 39} 40 41/** 42 * An internal catalog that is used by a Spark Session. This internal catalog serves as a 43 * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary 44 * tables and functions of the Spark Session that it belongs to. 45 * 46 * This class must be thread-safe. 47 */ 48class SessionCatalog( 49 externalCatalog: ExternalCatalog, 50 globalTempViewManager: GlobalTempViewManager, 51 functionResourceLoader: FunctionResourceLoader, 52 functionRegistry: FunctionRegistry, 53 conf: CatalystConf, 54 hadoopConf: Configuration) extends Logging { 55 import SessionCatalog._ 56 import CatalogTypes.TablePartitionSpec 57 58 // For testing only. 59 def this( 60 externalCatalog: ExternalCatalog, 61 functionRegistry: FunctionRegistry, 62 conf: CatalystConf) { 63 this( 64 externalCatalog, 65 new GlobalTempViewManager("global_temp"), 66 DummyFunctionResourceLoader, 67 functionRegistry, 68 conf, 69 new Configuration()) 70 } 71 72 // For testing only. 73 def this(externalCatalog: ExternalCatalog) { 74 this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true)) 75 } 76 77 /** List of temporary tables, mapping from table name to their logical plan. */ 78 @GuardedBy("this") 79 protected val tempTables = new mutable.HashMap[String, LogicalPlan] 80 81 // Note: we track current database here because certain operations do not explicitly 82 // specify the database (e.g. DROP TABLE my_table). In these cases we must first 83 // check whether the temporary table or function exists, then, if not, operate on 84 // the corresponding item in the current database. 85 @GuardedBy("this") 86 protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) 87 88 /** 89 * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), 90 * i.e. if this name only contains characters, numbers, and _. 91 * 92 * This method is intended to have the same behavior of 93 * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName. 94 */ 95 private def validateName(name: String): Unit = { 96 val validNameFormat = "([\\w_]+)".r 97 if (!validNameFormat.pattern.matcher(name).matches()) { 98 throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " + 99 "Valid names only contain alphabet characters, numbers and _.") 100 } 101 } 102 103 /** 104 * Format table name, taking into account case sensitivity. 105 */ 106 protected[this] def formatTableName(name: String): String = { 107 if (conf.caseSensitiveAnalysis) name else name.toLowerCase 108 } 109 110 /** 111 * Format database name, taking into account case sensitivity. 112 */ 113 protected[this] def formatDatabaseName(name: String): String = { 114 if (conf.caseSensitiveAnalysis) name else name.toLowerCase 115 } 116 117 /** 118 * This method is used to make the given path qualified before we 119 * store this path in the underlying external catalog. So, when a path 120 * does not contain a scheme, this path will not be changed after the default 121 * FileSystem is changed. 122 */ 123 private def makeQualifiedPath(path: String): Path = { 124 val hadoopPath = new Path(path) 125 val fs = hadoopPath.getFileSystem(hadoopConf) 126 fs.makeQualified(hadoopPath) 127 } 128 129 private def requireDbExists(db: String): Unit = { 130 if (!databaseExists(db)) { 131 throw new NoSuchDatabaseException(db) 132 } 133 } 134 135 private def requireTableExists(name: TableIdentifier): Unit = { 136 if (!tableExists(name)) { 137 val db = name.database.getOrElse(currentDb) 138 throw new NoSuchTableException(db = db, table = name.table) 139 } 140 } 141 142 private def requireTableNotExists(name: TableIdentifier): Unit = { 143 if (tableExists(name)) { 144 val db = name.database.getOrElse(currentDb) 145 throw new TableAlreadyExistsException(db = db, table = name.table) 146 } 147 } 148 // ---------------------------------------------------------------------------- 149 // Databases 150 // ---------------------------------------------------------------------------- 151 // All methods in this category interact directly with the underlying catalog. 152 // ---------------------------------------------------------------------------- 153 154 def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { 155 val dbName = formatDatabaseName(dbDefinition.name) 156 if (dbName == globalTempViewManager.database) { 157 throw new AnalysisException( 158 s"${globalTempViewManager.database} is a system preserved database, " + 159 "you cannot create a database with this name.") 160 } 161 validateName(dbName) 162 val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString 163 externalCatalog.createDatabase( 164 dbDefinition.copy(name = dbName, locationUri = qualifiedPath), 165 ignoreIfExists) 166 } 167 168 def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { 169 val dbName = formatDatabaseName(db) 170 if (dbName == DEFAULT_DATABASE) { 171 throw new AnalysisException(s"Can not drop default database") 172 } 173 externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade) 174 } 175 176 def alterDatabase(dbDefinition: CatalogDatabase): Unit = { 177 val dbName = formatDatabaseName(dbDefinition.name) 178 requireDbExists(dbName) 179 externalCatalog.alterDatabase(dbDefinition.copy(name = dbName)) 180 } 181 182 def getDatabaseMetadata(db: String): CatalogDatabase = { 183 val dbName = formatDatabaseName(db) 184 requireDbExists(dbName) 185 externalCatalog.getDatabase(dbName) 186 } 187 188 def databaseExists(db: String): Boolean = { 189 val dbName = formatDatabaseName(db) 190 externalCatalog.databaseExists(dbName) 191 } 192 193 def listDatabases(): Seq[String] = { 194 externalCatalog.listDatabases() 195 } 196 197 def listDatabases(pattern: String): Seq[String] = { 198 externalCatalog.listDatabases(pattern) 199 } 200 201 def getCurrentDatabase: String = synchronized { currentDb } 202 203 def setCurrentDatabase(db: String): Unit = { 204 val dbName = formatDatabaseName(db) 205 if (dbName == globalTempViewManager.database) { 206 throw new AnalysisException( 207 s"${globalTempViewManager.database} is a system preserved database, " + 208 "you cannot use it as current database. To access global temporary views, you should " + 209 "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " + 210 s"${globalTempViewManager.database}.viewName.") 211 } 212 requireDbExists(dbName) 213 synchronized { currentDb = dbName } 214 } 215 216 /** 217 * Get the path for creating a non-default database when database location is not provided 218 * by users. 219 */ 220 def getDefaultDBPath(db: String): String = { 221 val database = formatDatabaseName(db) 222 new Path(new Path(conf.warehousePath), database + ".db").toString 223 } 224 225 // ---------------------------------------------------------------------------- 226 // Tables 227 // ---------------------------------------------------------------------------- 228 // There are two kinds of tables, temporary tables and metastore tables. 229 // Temporary tables are isolated across sessions and do not belong to any 230 // particular database. Metastore tables can be used across multiple 231 // sessions as their metadata is persisted in the underlying catalog. 232 // ---------------------------------------------------------------------------- 233 234 // ---------------------------------------------------- 235 // | Methods that interact with metastore tables only | 236 // ---------------------------------------------------- 237 238 /** 239 * Create a metastore table in the database specified in `tableDefinition`. 240 * If no such database is specified, create it in the current database. 241 */ 242 def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { 243 val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) 244 val table = formatTableName(tableDefinition.identifier.table) 245 validateName(table) 246 val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) 247 requireDbExists(db) 248 externalCatalog.createTable(newTableDefinition, ignoreIfExists) 249 } 250 251 /** 252 * Alter the metadata of an existing metastore table identified by `tableDefinition`. 253 * 254 * If no database is specified in `tableDefinition`, assume the table is in the 255 * current database. 256 * 257 * Note: If the underlying implementation does not support altering a certain field, 258 * this becomes a no-op. 259 */ 260 def alterTable(tableDefinition: CatalogTable): Unit = { 261 val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) 262 val table = formatTableName(tableDefinition.identifier.table) 263 val tableIdentifier = TableIdentifier(table, Some(db)) 264 val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier) 265 requireDbExists(db) 266 requireTableExists(tableIdentifier) 267 externalCatalog.alterTable(newTableDefinition) 268 } 269 270 /** 271 * Return whether a table/view with the specified name exists. If no database is specified, check 272 * with current database. 273 */ 274 def tableExists(name: TableIdentifier): Boolean = synchronized { 275 val db = formatDatabaseName(name.database.getOrElse(currentDb)) 276 val table = formatTableName(name.table) 277 externalCatalog.tableExists(db, table) 278 } 279 280 /** 281 * Retrieve the metadata of an existing permanent table/view. If no database is specified, 282 * assume the table/view is in the current database. If the specified table/view is not found 283 * in the database then a [[NoSuchTableException]] is thrown. 284 */ 285 def getTableMetadata(name: TableIdentifier): CatalogTable = { 286 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 287 val table = formatTableName(name.table) 288 requireDbExists(db) 289 requireTableExists(TableIdentifier(table, Some(db))) 290 externalCatalog.getTable(db, table) 291 } 292 293 /** 294 * Retrieve the metadata of an existing metastore table. 295 * If no database is specified, assume the table is in the current database. 296 * If the specified table is not found in the database then return None if it doesn't exist. 297 */ 298 def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { 299 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 300 val table = formatTableName(name.table) 301 requireDbExists(db) 302 externalCatalog.getTableOption(db, table) 303 } 304 305 /** 306 * Load files stored in given path into an existing metastore table. 307 * If no database is specified, assume the table is in the current database. 308 * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown. 309 */ 310 def loadTable( 311 name: TableIdentifier, 312 loadPath: String, 313 isOverwrite: Boolean, 314 holdDDLTime: Boolean): Unit = { 315 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 316 val table = formatTableName(name.table) 317 requireDbExists(db) 318 requireTableExists(TableIdentifier(table, Some(db))) 319 externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime) 320 } 321 322 /** 323 * Load files stored in given path into the partition of an existing metastore table. 324 * If no database is specified, assume the table is in the current database. 325 * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown. 326 */ 327 def loadPartition( 328 name: TableIdentifier, 329 loadPath: String, 330 spec: TablePartitionSpec, 331 isOverwrite: Boolean, 332 holdDDLTime: Boolean, 333 inheritTableSpecs: Boolean): Unit = { 334 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 335 val table = formatTableName(name.table) 336 requireDbExists(db) 337 requireTableExists(TableIdentifier(table, Some(db))) 338 requireNonEmptyValueInPartitionSpec(Seq(spec)) 339 externalCatalog.loadPartition( 340 db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs) 341 } 342 343 def defaultTablePath(tableIdent: TableIdentifier): String = { 344 val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase)) 345 val dbLocation = getDatabaseMetadata(dbName).locationUri 346 347 new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString 348 } 349 350 // ---------------------------------------------- 351 // | Methods that interact with temp views only | 352 // ---------------------------------------------- 353 354 /** 355 * Create a local temporary view. 356 */ 357 def createTempView( 358 name: String, 359 tableDefinition: LogicalPlan, 360 overrideIfExists: Boolean): Unit = synchronized { 361 val table = formatTableName(name) 362 if (tempTables.contains(table) && !overrideIfExists) { 363 throw new TempTableAlreadyExistsException(name) 364 } 365 tempTables.put(table, tableDefinition) 366 } 367 368 /** 369 * Create a global temporary view. 370 */ 371 def createGlobalTempView( 372 name: String, 373 viewDefinition: LogicalPlan, 374 overrideIfExists: Boolean): Unit = { 375 globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists) 376 } 377 378 /** 379 * Alter the definition of a local/global temp view matching the given name, returns true if a 380 * temp view is matched and altered, false otherwise. 381 */ 382 def alterTempViewDefinition( 383 name: TableIdentifier, 384 viewDefinition: LogicalPlan): Boolean = synchronized { 385 val viewName = formatTableName(name.table) 386 if (name.database.isEmpty) { 387 if (tempTables.contains(viewName)) { 388 createTempView(viewName, viewDefinition, overrideIfExists = true) 389 true 390 } else { 391 false 392 } 393 } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { 394 globalTempViewManager.update(viewName, viewDefinition) 395 } else { 396 false 397 } 398 } 399 400 /** 401 * Return a local temporary view exactly as it was stored. 402 */ 403 def getTempView(name: String): Option[LogicalPlan] = synchronized { 404 tempTables.get(formatTableName(name)) 405 } 406 407 /** 408 * Return a global temporary view exactly as it was stored. 409 */ 410 def getGlobalTempView(name: String): Option[LogicalPlan] = { 411 globalTempViewManager.get(formatTableName(name)) 412 } 413 414 /** 415 * Drop a local temporary view. 416 * 417 * Returns true if this view is dropped successfully, false otherwise. 418 */ 419 def dropTempView(name: String): Boolean = synchronized { 420 tempTables.remove(formatTableName(name)).isDefined 421 } 422 423 /** 424 * Drop a global temporary view. 425 * 426 * Returns true if this view is dropped successfully, false otherwise. 427 */ 428 def dropGlobalTempView(name: String): Boolean = { 429 globalTempViewManager.remove(formatTableName(name)) 430 } 431 432 // ------------------------------------------------------------- 433 // | Methods that interact with temporary and metastore tables | 434 // ------------------------------------------------------------- 435 436 /** 437 * Retrieve the metadata of an existing temporary view or permanent table/view. 438 * 439 * If a database is specified in `name`, this will return the metadata of table/view in that 440 * database. 441 * If no database is specified, this will first attempt to get the metadata of a temporary view 442 * with the same name, then, if that does not exist, return the metadata of table/view in the 443 * current database. 444 */ 445 def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized { 446 val table = formatTableName(name.table) 447 if (name.database.isEmpty) { 448 getTempView(table).map { plan => 449 CatalogTable( 450 identifier = TableIdentifier(table), 451 tableType = CatalogTableType.VIEW, 452 storage = CatalogStorageFormat.empty, 453 schema = plan.output.toStructType) 454 }.getOrElse(getTableMetadata(name)) 455 } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { 456 globalTempViewManager.get(table).map { plan => 457 CatalogTable( 458 identifier = TableIdentifier(table, Some(globalTempViewManager.database)), 459 tableType = CatalogTableType.VIEW, 460 storage = CatalogStorageFormat.empty, 461 schema = plan.output.toStructType) 462 }.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table)) 463 } else { 464 getTableMetadata(name) 465 } 466 } 467 468 /** 469 * Rename a table. 470 * 471 * If a database is specified in `oldName`, this will rename the table in that database. 472 * If no database is specified, this will first attempt to rename a temporary table with 473 * the same name, then, if that does not exist, rename the table in the current database. 474 * 475 * This assumes the database specified in `newName` matches the one in `oldName`. 476 */ 477 def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized { 478 val db = formatDatabaseName(oldName.database.getOrElse(currentDb)) 479 newName.database.map(formatDatabaseName).foreach { newDb => 480 if (db != newDb) { 481 throw new AnalysisException( 482 s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'") 483 } 484 } 485 486 val oldTableName = formatTableName(oldName.table) 487 val newTableName = formatTableName(newName.table) 488 if (db == globalTempViewManager.database) { 489 globalTempViewManager.rename(oldTableName, newTableName) 490 } else { 491 requireDbExists(db) 492 if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { 493 requireTableExists(TableIdentifier(oldTableName, Some(db))) 494 requireTableNotExists(TableIdentifier(newTableName, Some(db))) 495 validateName(newTableName) 496 externalCatalog.renameTable(db, oldTableName, newTableName) 497 } else { 498 if (newName.database.isDefined) { 499 throw new AnalysisException( 500 s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': cannot specify database " + 501 s"name '${newName.database.get}' in the destination table") 502 } 503 if (tempTables.contains(newTableName)) { 504 throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " + 505 "destination table already exists") 506 } 507 val table = tempTables(oldTableName) 508 tempTables.remove(oldTableName) 509 tempTables.put(newTableName, table) 510 } 511 } 512 } 513 514 /** 515 * Drop a table. 516 * 517 * If a database is specified in `name`, this will drop the table from that database. 518 * If no database is specified, this will first attempt to drop a temporary table with 519 * the same name, then, if that does not exist, drop the table from the current database. 520 */ 521 def dropTable( 522 name: TableIdentifier, 523 ignoreIfNotExists: Boolean, 524 purge: Boolean): Unit = synchronized { 525 val db = formatDatabaseName(name.database.getOrElse(currentDb)) 526 val table = formatTableName(name.table) 527 if (db == globalTempViewManager.database) { 528 val viewExists = globalTempViewManager.remove(table) 529 if (!viewExists && !ignoreIfNotExists) { 530 throw new NoSuchTableException(globalTempViewManager.database, table) 531 } 532 } else { 533 if (name.database.isDefined || !tempTables.contains(table)) { 534 requireDbExists(db) 535 // When ignoreIfNotExists is false, no exception is issued when the table does not exist. 536 // Instead, log it as an error message. 537 if (tableExists(TableIdentifier(table, Option(db)))) { 538 externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge) 539 } else if (!ignoreIfNotExists) { 540 throw new NoSuchTableException(db = db, table = table) 541 } 542 } else { 543 tempTables.remove(table) 544 } 545 } 546 } 547 548 /** 549 * Return a [[LogicalPlan]] that represents the given table or view. 550 * 551 * If a database is specified in `name`, this will return the table/view from that database. 552 * If no database is specified, this will first attempt to return a temporary table/view with 553 * the same name, then, if that does not exist, return the table/view from the current database. 554 * 555 * Note that, the global temp view database is also valid here, this will return the global temp 556 * view matching the given name. 557 * 558 * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will 559 * track the name of the view. 560 */ 561 def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { 562 synchronized { 563 val db = formatDatabaseName(name.database.getOrElse(currentDb)) 564 val table = formatTableName(name.table) 565 val relationAlias = alias.getOrElse(table) 566 if (db == globalTempViewManager.database) { 567 globalTempViewManager.get(table).map { viewDef => 568 SubqueryAlias(relationAlias, viewDef, Some(name)) 569 }.getOrElse(throw new NoSuchTableException(db, table)) 570 } else if (name.database.isDefined || !tempTables.contains(table)) { 571 val metadata = externalCatalog.getTable(db, table) 572 val view = Option(metadata.tableType).collect { 573 case CatalogTableType.VIEW => name 574 } 575 SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view) 576 } else { 577 SubqueryAlias(relationAlias, tempTables(table), Option(name)) 578 } 579 } 580 } 581 582 /** 583 * Return whether a table with the specified name is a temporary table. 584 * 585 * Note: The temporary table cache is checked only when database is not 586 * explicitly specified. 587 */ 588 def isTemporaryTable(name: TableIdentifier): Boolean = synchronized { 589 val table = formatTableName(name.table) 590 if (name.database.isEmpty) { 591 tempTables.contains(table) 592 } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { 593 globalTempViewManager.get(table).isDefined 594 } else { 595 false 596 } 597 } 598 599 /** 600 * List all tables in the specified database, including local temporary tables. 601 * 602 * Note that, if the specified database is global temporary view database, we will list global 603 * temporary views. 604 */ 605 def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") 606 607 /** 608 * List all matching tables in the specified database, including local temporary tables. 609 * 610 * Note that, if the specified database is global temporary view database, we will list global 611 * temporary views. 612 */ 613 def listTables(db: String, pattern: String): Seq[TableIdentifier] = { 614 val dbName = formatDatabaseName(db) 615 val dbTables = if (dbName == globalTempViewManager.database) { 616 globalTempViewManager.listViewNames(pattern).map { name => 617 TableIdentifier(name, Some(globalTempViewManager.database)) 618 } 619 } else { 620 requireDbExists(dbName) 621 externalCatalog.listTables(dbName, pattern).map { name => 622 TableIdentifier(name, Some(dbName)) 623 } 624 } 625 val localTempViews = synchronized { 626 StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name => 627 TableIdentifier(name) 628 } 629 } 630 dbTables ++ localTempViews 631 } 632 633 /** 634 * Refresh the cache entry for a metastore table, if any. 635 */ 636 def refreshTable(name: TableIdentifier): Unit = synchronized { 637 // Go through temporary tables and invalidate them. 638 // If the database is defined, this is definitely not a temp table. 639 // If the database is not defined, there is a good chance this is a temp table. 640 if (name.database.isEmpty) { 641 tempTables.get(formatTableName(name.table)).foreach(_.refresh()) 642 } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { 643 globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh()) 644 } 645 } 646 647 /** 648 * Drop all existing temporary tables. 649 * For testing only. 650 */ 651 def clearTempTables(): Unit = synchronized { 652 tempTables.clear() 653 } 654 655 // ---------------------------------------------------------------------------- 656 // Partitions 657 // ---------------------------------------------------------------------------- 658 // All methods in this category interact directly with the underlying catalog. 659 // These methods are concerned with only metastore tables. 660 // ---------------------------------------------------------------------------- 661 662 // TODO: We need to figure out how these methods interact with our data source 663 // tables. For such tables, we do not store values of partitioning columns in 664 // the metastore. For now, partition values of a data source table will be 665 // automatically discovered when we load the table. 666 667 /** 668 * Create partitions in an existing table, assuming it exists. 669 * If no database is specified, assume the table is in the current database. 670 */ 671 def createPartitions( 672 tableName: TableIdentifier, 673 parts: Seq[CatalogTablePartition], 674 ignoreIfExists: Boolean): Unit = { 675 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 676 val table = formatTableName(tableName.table) 677 requireDbExists(db) 678 requireTableExists(TableIdentifier(table, Option(db))) 679 requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) 680 requireNonEmptyValueInPartitionSpec(parts.map(_.spec)) 681 externalCatalog.createPartitions(db, table, parts, ignoreIfExists) 682 } 683 684 /** 685 * Drop partitions from a table, assuming they exist. 686 * If no database is specified, assume the table is in the current database. 687 */ 688 def dropPartitions( 689 tableName: TableIdentifier, 690 specs: Seq[TablePartitionSpec], 691 ignoreIfNotExists: Boolean, 692 purge: Boolean, 693 retainData: Boolean): Unit = { 694 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 695 val table = formatTableName(tableName.table) 696 requireDbExists(db) 697 requireTableExists(TableIdentifier(table, Option(db))) 698 requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) 699 requireNonEmptyValueInPartitionSpec(specs) 700 externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData) 701 } 702 703 /** 704 * Override the specs of one or many existing table partitions, assuming they exist. 705 * 706 * This assumes index i of `specs` corresponds to index i of `newSpecs`. 707 * If no database is specified, assume the table is in the current database. 708 */ 709 def renamePartitions( 710 tableName: TableIdentifier, 711 specs: Seq[TablePartitionSpec], 712 newSpecs: Seq[TablePartitionSpec]): Unit = { 713 val tableMetadata = getTableMetadata(tableName) 714 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 715 val table = formatTableName(tableName.table) 716 requireDbExists(db) 717 requireTableExists(TableIdentifier(table, Option(db))) 718 requireExactMatchedPartitionSpec(specs, tableMetadata) 719 requireExactMatchedPartitionSpec(newSpecs, tableMetadata) 720 requireNonEmptyValueInPartitionSpec(specs) 721 requireNonEmptyValueInPartitionSpec(newSpecs) 722 externalCatalog.renamePartitions(db, table, specs, newSpecs) 723 } 724 725 /** 726 * Alter one or many table partitions whose specs that match those specified in `parts`, 727 * assuming the partitions exist. 728 * 729 * If no database is specified, assume the table is in the current database. 730 * 731 * Note: If the underlying implementation does not support altering a certain field, 732 * this becomes a no-op. 733 */ 734 def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { 735 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 736 val table = formatTableName(tableName.table) 737 requireDbExists(db) 738 requireTableExists(TableIdentifier(table, Option(db))) 739 requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) 740 requireNonEmptyValueInPartitionSpec(parts.map(_.spec)) 741 externalCatalog.alterPartitions(db, table, parts) 742 } 743 744 /** 745 * Retrieve the metadata of a table partition, assuming it exists. 746 * If no database is specified, assume the table is in the current database. 747 */ 748 def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { 749 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 750 val table = formatTableName(tableName.table) 751 requireDbExists(db) 752 requireTableExists(TableIdentifier(table, Option(db))) 753 requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) 754 requireNonEmptyValueInPartitionSpec(Seq(spec)) 755 externalCatalog.getPartition(db, table, spec) 756 } 757 758 /** 759 * List the names of all partitions that belong to the specified table, assuming it exists. 760 * 761 * A partial partition spec may optionally be provided to filter the partitions returned. 762 * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), 763 * then a partial spec of (a='1') will return the first two only. 764 */ 765 def listPartitionNames( 766 tableName: TableIdentifier, 767 partialSpec: Option[TablePartitionSpec] = None): Seq[String] = { 768 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 769 val table = formatTableName(tableName.table) 770 requireDbExists(db) 771 requireTableExists(TableIdentifier(table, Option(db))) 772 partialSpec.foreach { spec => 773 requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) 774 requireNonEmptyValueInPartitionSpec(Seq(spec)) 775 } 776 externalCatalog.listPartitionNames(db, table, partialSpec) 777 } 778 779 /** 780 * List the metadata of all partitions that belong to the specified table, assuming it exists. 781 * 782 * A partial partition spec may optionally be provided to filter the partitions returned. 783 * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), 784 * then a partial spec of (a='1') will return the first two only. 785 */ 786 def listPartitions( 787 tableName: TableIdentifier, 788 partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = { 789 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 790 val table = formatTableName(tableName.table) 791 requireDbExists(db) 792 requireTableExists(TableIdentifier(table, Option(db))) 793 partialSpec.foreach { spec => 794 requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) 795 requireNonEmptyValueInPartitionSpec(Seq(spec)) 796 } 797 externalCatalog.listPartitions(db, table, partialSpec) 798 } 799 800 /** 801 * List the metadata of partitions that belong to the specified table, assuming it exists, that 802 * satisfy the given partition-pruning predicate expressions. 803 */ 804 def listPartitionsByFilter( 805 tableName: TableIdentifier, 806 predicates: Seq[Expression]): Seq[CatalogTablePartition] = { 807 val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) 808 val table = formatTableName(tableName.table) 809 requireDbExists(db) 810 requireTableExists(TableIdentifier(table, Option(db))) 811 externalCatalog.listPartitionsByFilter(db, table, predicates) 812 } 813 814 /** 815 * Verify if the input partition spec has any empty value. 816 */ 817 private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { 818 specs.foreach { s => 819 if (s.values.exists(_.isEmpty)) { 820 val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") 821 throw new AnalysisException( 822 s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") 823 } 824 } 825 } 826 827 /** 828 * Verify if the input partition spec exactly matches the existing defined partition spec 829 * The columns must be the same but the orders could be different. 830 */ 831 private def requireExactMatchedPartitionSpec( 832 specs: Seq[TablePartitionSpec], 833 table: CatalogTable): Unit = { 834 val defined = table.partitionColumnNames.sorted 835 specs.foreach { s => 836 if (s.keys.toSeq.sorted != defined) { 837 throw new AnalysisException( 838 s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " + 839 s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " + 840 s"table '${table.identifier}'") 841 } 842 } 843 } 844 845 /** 846 * Verify if the input partition spec partially matches the existing defined partition spec 847 * That is, the columns of partition spec should be part of the defined partition spec. 848 */ 849 private def requirePartialMatchedPartitionSpec( 850 specs: Seq[TablePartitionSpec], 851 table: CatalogTable): Unit = { 852 val defined = table.partitionColumnNames 853 specs.foreach { s => 854 if (!s.keys.forall(defined.contains)) { 855 throw new AnalysisException( 856 s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " + 857 s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " + 858 s"in table '${table.identifier}'") 859 } 860 } 861 } 862 863 // ---------------------------------------------------------------------------- 864 // Functions 865 // ---------------------------------------------------------------------------- 866 // There are two kinds of functions, temporary functions and metastore 867 // functions (permanent UDFs). Temporary functions are isolated across 868 // sessions. Metastore functions can be used across multiple sessions as 869 // their metadata is persisted in the underlying catalog. 870 // ---------------------------------------------------------------------------- 871 872 // ------------------------------------------------------- 873 // | Methods that interact with metastore functions only | 874 // ------------------------------------------------------- 875 876 /** 877 * Create a metastore function in the database specified in `funcDefinition`. 878 * If no such database is specified, create it in the current database. 879 */ 880 def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { 881 val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase)) 882 requireDbExists(db) 883 val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) 884 val newFuncDefinition = funcDefinition.copy(identifier = identifier) 885 if (!functionExists(identifier)) { 886 externalCatalog.createFunction(db, newFuncDefinition) 887 } else if (!ignoreIfExists) { 888 throw new FunctionAlreadyExistsException(db = db, func = identifier.toString) 889 } 890 } 891 892 /** 893 * Drop a metastore function. 894 * If no database is specified, assume the function is in the current database. 895 */ 896 def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { 897 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 898 requireDbExists(db) 899 val identifier = name.copy(database = Some(db)) 900 if (functionExists(identifier)) { 901 // TODO: registry should just take in FunctionIdentifier for type safety 902 if (functionRegistry.functionExists(identifier.unquotedString)) { 903 // If we have loaded this function into the FunctionRegistry, 904 // also drop it from there. 905 // For a permanent function, because we loaded it to the FunctionRegistry 906 // when it's first used, we also need to drop it from the FunctionRegistry. 907 functionRegistry.dropFunction(identifier.unquotedString) 908 } 909 externalCatalog.dropFunction(db, name.funcName) 910 } else if (!ignoreIfNotExists) { 911 throw new NoSuchFunctionException(db = db, func = identifier.toString) 912 } 913 } 914 915 /** 916 * Retrieve the metadata of a metastore function. 917 * 918 * If a database is specified in `name`, this will return the function in that database. 919 * If no database is specified, this will return the function in the current database. 920 */ 921 def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = { 922 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 923 requireDbExists(db) 924 externalCatalog.getFunction(db, name.funcName) 925 } 926 927 /** 928 * Check if the specified function exists. 929 */ 930 def functionExists(name: FunctionIdentifier): Boolean = { 931 val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) 932 requireDbExists(db) 933 functionRegistry.functionExists(name.unquotedString) || 934 externalCatalog.functionExists(db, name.funcName) 935 } 936 937 // ---------------------------------------------------------------- 938 // | Methods that interact with temporary and metastore functions | 939 // ---------------------------------------------------------------- 940 941 /** 942 * Construct a [[FunctionBuilder]] based on the provided class that represents a function. 943 * 944 * This performs reflection to decide what type of [[Expression]] to return in the builder. 945 */ 946 def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { 947 // TODO: at least support UDAFs here 948 throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.") 949 } 950 951 /** 952 * Loads resources such as JARs and Files for a function. Every resource is represented 953 * by a tuple (resource type, resource uri). 954 */ 955 def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { 956 resources.foreach(functionResourceLoader.loadResource) 957 } 958 959 /** 960 * Create a temporary function. 961 * This assumes no database is specified in `funcDefinition`. 962 */ 963 def createTempFunction( 964 name: String, 965 info: ExpressionInfo, 966 funcDefinition: FunctionBuilder, 967 ignoreIfExists: Boolean): Unit = { 968 if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) { 969 throw new TempFunctionAlreadyExistsException(name) 970 } 971 functionRegistry.registerFunction(name, info, funcDefinition) 972 } 973 974 /** 975 * Drop a temporary function. 976 */ 977 def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = { 978 if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { 979 throw new NoSuchTempFunctionException(name) 980 } 981 } 982 983 /** 984 * Returns whether it is a temporary function. If not existed, returns false. 985 */ 986 def isTemporaryFunction(name: FunctionIdentifier): Boolean = { 987 // copied from HiveSessionCatalog 988 val hiveFunctions = Seq("histogram_numeric") 989 990 // A temporary function is a function that has been registered in functionRegistry 991 // without a database name, and is neither a built-in function nor a Hive function 992 name.database.isEmpty && 993 functionRegistry.functionExists(name.funcName) && 994 !FunctionRegistry.builtin.functionExists(name.funcName) && 995 !hiveFunctions.contains(name.funcName.toLowerCase) 996 } 997 998 protected def failFunctionLookup(name: String): Nothing = { 999 throw new NoSuchFunctionException(db = currentDb, func = name) 1000 } 1001 1002 /** 1003 * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists. 1004 */ 1005 def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized { 1006 // TODO: just make function registry take in FunctionIdentifier instead of duplicating this 1007 val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName) 1008 val qualifiedName = name.copy(database = database) 1009 functionRegistry.lookupFunction(name.funcName) 1010 .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString)) 1011 .getOrElse { 1012 val db = qualifiedName.database.get 1013 requireDbExists(db) 1014 if (externalCatalog.functionExists(db, name.funcName)) { 1015 val metadata = externalCatalog.getFunction(db, name.funcName) 1016 new ExpressionInfo( 1017 metadata.className, 1018 qualifiedName.database.orNull, 1019 qualifiedName.identifier) 1020 } else { 1021 failFunctionLookup(name.funcName) 1022 } 1023 } 1024 } 1025 1026 /** 1027 * Return an [[Expression]] that represents the specified function, assuming it exists. 1028 * 1029 * For a temporary function or a permanent function that has been loaded, 1030 * this method will simply lookup the function through the 1031 * FunctionRegistry and create an expression based on the builder. 1032 * 1033 * For a permanent function that has not been loaded, we will first fetch its metadata 1034 * from the underlying external catalog. Then, we will load all resources associated 1035 * with this function (i.e. jars and files). Finally, we create a function builder 1036 * based on the function class and put the builder into the FunctionRegistry. 1037 * The name of this function in the FunctionRegistry will be `databaseName.functionName`. 1038 */ 1039 def lookupFunction( 1040 name: FunctionIdentifier, 1041 children: Seq[Expression]): Expression = synchronized { 1042 // Note: the implementation of this function is a little bit convoluted. 1043 // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions 1044 // (built-in, temp, and external). 1045 if (name.database.isEmpty && functionRegistry.functionExists(name.funcName)) { 1046 // This function has been already loaded into the function registry. 1047 return functionRegistry.lookupFunction(name.funcName, children) 1048 } 1049 1050 // If the name itself is not qualified, add the current database to it. 1051 val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName) 1052 val qualifiedName = name.copy(database = database) 1053 1054 if (functionRegistry.functionExists(qualifiedName.unquotedString)) { 1055 // This function has been already loaded into the function registry. 1056 // Unlike the above block, we find this function by using the qualified name. 1057 return functionRegistry.lookupFunction(qualifiedName.unquotedString, children) 1058 } 1059 1060 // The function has not been loaded to the function registry, which means 1061 // that the function is a permanent function (if it actually has been registered 1062 // in the metastore). We need to first put the function in the FunctionRegistry. 1063 // TODO: why not just check whether the function exists first? 1064 val catalogFunction = try { 1065 externalCatalog.getFunction(currentDb, name.funcName) 1066 } catch { 1067 case e: AnalysisException => failFunctionLookup(name.funcName) 1068 case e: NoSuchPermanentFunctionException => failFunctionLookup(name.funcName) 1069 } 1070 loadFunctionResources(catalogFunction.resources) 1071 // Please note that qualifiedName is provided by the user. However, 1072 // catalogFunction.identifier.unquotedString is returned by the underlying 1073 // catalog. So, it is possible that qualifiedName is not exactly the same as 1074 // catalogFunction.identifier.unquotedString (difference is on case-sensitivity). 1075 // At here, we preserve the input from the user. 1076 val info = new ExpressionInfo( 1077 catalogFunction.className, 1078 qualifiedName.database.orNull, 1079 qualifiedName.funcName) 1080 val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className) 1081 createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false) 1082 // Now, we need to create the Expression. 1083 functionRegistry.lookupFunction(qualifiedName.unquotedString, children) 1084 } 1085 1086 /** 1087 * List all functions in the specified database, including temporary functions. This 1088 * returns the function identifier and the scope in which it was defined (system or user 1089 * defined). 1090 */ 1091 def listFunctions(db: String): Seq[(FunctionIdentifier, String)] = listFunctions(db, "*") 1092 1093 /** 1094 * List all matching functions in the specified database, including temporary functions. This 1095 * returns the function identifier and the scope in which it was defined (system or user 1096 * defined). 1097 */ 1098 def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = { 1099 val dbName = formatDatabaseName(db) 1100 requireDbExists(dbName) 1101 val dbFunctions = externalCatalog.listFunctions(dbName, pattern) 1102 .map { f => FunctionIdentifier(f, Some(dbName)) } 1103 val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern) 1104 .map { f => FunctionIdentifier(f) } 1105 val functions = dbFunctions ++ loadedFunctions 1106 functions.map { 1107 case f if FunctionRegistry.functionSet.contains(f.funcName) => (f, "SYSTEM") 1108 case f => (f, "USER") 1109 } 1110 } 1111 1112 1113 // ----------------- 1114 // | Other methods | 1115 // ----------------- 1116 1117 /** 1118 * Drop all existing databases (except "default"), tables, partitions and functions, 1119 * and set the current database to "default". 1120 * 1121 * This is mainly used for tests. 1122 */ 1123 def reset(): Unit = synchronized { 1124 setCurrentDatabase(DEFAULT_DATABASE) 1125 listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => 1126 dropDatabase(db, ignoreIfNotExists = false, cascade = true) 1127 } 1128 listTables(DEFAULT_DATABASE).foreach { table => 1129 dropTable(table, ignoreIfNotExists = false, purge = false) 1130 } 1131 listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func => 1132 if (func.database.isDefined) { 1133 dropFunction(func, ignoreIfNotExists = false) 1134 } else { 1135 dropTempFunction(func.funcName, ignoreIfNotExists = false) 1136 } 1137 } 1138 tempTables.clear() 1139 globalTempViewManager.clear() 1140 functionRegistry.clear() 1141 // restore built-in functions 1142 FunctionRegistry.builtin.listFunction().foreach { f => 1143 val expressionInfo = FunctionRegistry.builtin.lookupFunction(f) 1144 val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f) 1145 require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info") 1146 require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder") 1147 functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get) 1148 } 1149 } 1150 1151} 1152