1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.hive.client 19 20import java.io.{ByteArrayOutputStream, File, PrintStream} 21 22import org.apache.hadoop.conf.Configuration 23import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat 24import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe 25import org.apache.hadoop.mapred.TextInputFormat 26 27import org.apache.spark.SparkFunSuite 28import org.apache.spark.internal.Logging 29import org.apache.spark.sql.{AnalysisException, Row} 30import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} 31import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} 32import org.apache.spark.sql.catalyst.catalog._ 33import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} 34import org.apache.spark.sql.catalyst.util.quietly 35import org.apache.spark.sql.hive.HiveUtils 36import org.apache.spark.sql.hive.test.TestHiveSingleton 37import org.apache.spark.sql.test.SQLTestUtils 38import org.apache.spark.sql.types.IntegerType 39import org.apache.spark.sql.types.StructType 40import org.apache.spark.tags.ExtendedHiveTest 41import org.apache.spark.util.{MutableURLClassLoader, Utils} 42 43/** 44 * A simple set of tests that call the methods of a [[HiveClient]], loading different version 45 * of hive from maven central. These tests are simple in that they are mostly just testing to make 46 * sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality 47 * is not fully tested. 48 */ 49@ExtendedHiveTest 50class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging { 51 52 private val clientBuilder = new HiveClientBuilder 53 import clientBuilder.buildClient 54 55 test("success sanity check") { 56 val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration()) 57 val db = new CatalogDatabase("default", "desc", "loc", Map()) 58 badClient.createDatabase(db, ignoreIfExists = true) 59 } 60 61 test("hadoop configuration preserved") { 62 val hadoopConf = new Configuration() 63 hadoopConf.set("test", "success") 64 val client = buildClient(HiveUtils.hiveExecutionVersion, hadoopConf) 65 assert("success" === client.getConf("test", null)) 66 } 67 68 private def getNestedMessages(e: Throwable): String = { 69 var causes = "" 70 var lastException = e 71 while (lastException != null) { 72 causes += lastException.toString + "\n" 73 lastException = lastException.getCause 74 } 75 causes 76 } 77 78 private val emptyDir = Utils.createTempDir().getCanonicalPath 79 80 // Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally 81 // connecting to an auto-populated, in-process metastore. Let's make sure we are getting the 82 // versions right by forcing a known compatibility failure. 83 // TODO: currently only works on mysql where we manually create the schema... 84 ignore("failure sanity check") { 85 val e = intercept[Throwable] { 86 val badClient = quietly { buildClient("13", new Configuration()) } 87 } 88 assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'") 89 } 90 91 private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2") 92 93 private var client: HiveClient = null 94 95 versions.foreach { version => 96 test(s"$version: create client") { 97 client = null 98 System.gc() // Hack to avoid SEGV on some JVM versions. 99 val hadoopConf = new Configuration() 100 hadoopConf.set("test", "success") 101 client = buildClient(version, hadoopConf) 102 } 103 104 def table(database: String, tableName: String): CatalogTable = { 105 CatalogTable( 106 identifier = TableIdentifier(tableName, Some(database)), 107 tableType = CatalogTableType.MANAGED, 108 schema = new StructType().add("key", "int"), 109 storage = CatalogStorageFormat( 110 locationUri = None, 111 inputFormat = Some(classOf[TextInputFormat].getName), 112 outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), 113 serde = Some(classOf[LazySimpleSerDe].getName()), 114 compressed = false, 115 properties = Map.empty 116 )) 117 } 118 119 /////////////////////////////////////////////////////////////////////////// 120 // Database related API 121 /////////////////////////////////////////////////////////////////////////// 122 123 val tempDatabasePath = Utils.createTempDir().getCanonicalPath 124 125 test(s"$version: createDatabase") { 126 val defaultDB = CatalogDatabase("default", "desc", "loc", Map()) 127 client.createDatabase(defaultDB, ignoreIfExists = true) 128 val tempDB = CatalogDatabase( 129 "temporary", description = "test create", tempDatabasePath, Map()) 130 client.createDatabase(tempDB, ignoreIfExists = true) 131 } 132 133 test(s"$version: setCurrentDatabase") { 134 client.setCurrentDatabase("default") 135 } 136 137 test(s"$version: getDatabase") { 138 // No exception should be thrown 139 client.getDatabase("default") 140 intercept[NoSuchDatabaseException](client.getDatabase("nonexist")) 141 } 142 143 test(s"$version: databaseExists") { 144 assert(client.databaseExists("default") == true) 145 assert(client.databaseExists("nonexist") == false) 146 } 147 148 test(s"$version: listDatabases") { 149 assert(client.listDatabases("defau.*") == Seq("default")) 150 } 151 152 test(s"$version: alterDatabase") { 153 val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true")) 154 client.alterDatabase(database) 155 assert(client.getDatabase("temporary").properties.contains("flag")) 156 } 157 158 test(s"$version: dropDatabase") { 159 assert(client.databaseExists("temporary") == true) 160 client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true) 161 assert(client.databaseExists("temporary") == false) 162 } 163 164 /////////////////////////////////////////////////////////////////////////// 165 // Table related API 166 /////////////////////////////////////////////////////////////////////////// 167 168 test(s"$version: createTable") { 169 client.createTable(table("default", tableName = "src"), ignoreIfExists = false) 170 client.createTable(table("default", "temporary"), ignoreIfExists = false) 171 } 172 173 test(s"$version: loadTable") { 174 client.loadTable( 175 emptyDir, 176 tableName = "src", 177 replace = false, 178 holdDDLTime = false) 179 } 180 181 test(s"$version: tableExists") { 182 // No exception should be thrown 183 assert(client.tableExists("default", "src")) 184 assert(!client.tableExists("default", "nonexistent")) 185 } 186 187 test(s"$version: getTable") { 188 // No exception should be thrown 189 client.getTable("default", "src") 190 } 191 192 test(s"$version: getTableOption") { 193 assert(client.getTableOption("default", "src").isDefined) 194 } 195 196 test(s"$version: alterTable(table: CatalogTable)") { 197 val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> "")) 198 client.alterTable(newTable) 199 assert(client.getTable("default", "src").properties.contains("changed")) 200 } 201 202 test(s"$version: alterTable(tableName: String, table: CatalogTable)") { 203 val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> "")) 204 client.alterTable("src", newTable) 205 assert(client.getTable("default", "src").properties.contains("changedAgain")) 206 } 207 208 test(s"$version: listTables(database)") { 209 assert(client.listTables("default") === Seq("src", "temporary")) 210 } 211 212 test(s"$version: listTables(database, pattern)") { 213 assert(client.listTables("default", pattern = "src") === Seq("src")) 214 assert(client.listTables("default", pattern = "nonexist").isEmpty) 215 } 216 217 test(s"$version: dropTable") { 218 val versionsWithoutPurge = versions.takeWhile(_ != "0.14") 219 // First try with the purge option set. This should fail if the version is < 0.14, in which 220 // case we check the version and try without it. 221 try { 222 client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, 223 purge = true) 224 assert(!versionsWithoutPurge.contains(version)) 225 } catch { 226 case _: UnsupportedOperationException => 227 assert(versionsWithoutPurge.contains(version)) 228 client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, 229 purge = false) 230 } 231 assert(client.listTables("default") === Seq("src")) 232 } 233 234 /////////////////////////////////////////////////////////////////////////// 235 // Partition related API 236 /////////////////////////////////////////////////////////////////////////// 237 238 val storageFormat = CatalogStorageFormat( 239 locationUri = None, 240 inputFormat = None, 241 outputFormat = None, 242 serde = None, 243 compressed = false, 244 properties = Map.empty) 245 246 test(s"$version: sql create partitioned table") { 247 client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") 248 } 249 250 val testPartitionCount = 2 251 252 test(s"$version: createPartitions") { 253 val partitions = (1 to testPartitionCount).map { key2 => 254 CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) 255 } 256 client.createPartitions( 257 "default", "src_part", partitions, ignoreIfExists = true) 258 } 259 260 test(s"$version: getPartitionNames(catalogTable)") { 261 val partitionNames = (1 to testPartitionCount).map(key2 => s"key1=1/key2=$key2") 262 assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part"))) 263 } 264 265 test(s"$version: getPartitions(catalogTable)") { 266 assert(testPartitionCount == 267 client.getPartitions(client.getTable("default", "src_part")).size) 268 } 269 270 test(s"$version: getPartitionsByFilter") { 271 // Only one partition [1, 1] for key2 == 1 272 val result = client.getPartitionsByFilter(client.getTable("default", "src_part"), 273 Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) 274 275 // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition. 276 if (version != "0.12") { 277 assert(result.size == 1) 278 } else { 279 assert(result.size == testPartitionCount) 280 } 281 } 282 283 test(s"$version: getPartition") { 284 // No exception should be thrown 285 client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2")) 286 } 287 288 test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") { 289 val partition = client.getPartitionOption( 290 "default", "src_part", Map("key1" -> "1", "key2" -> "2")) 291 assert(partition.isDefined) 292 } 293 294 test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") { 295 val partition = client.getPartitionOption( 296 client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2")) 297 assert(partition.isDefined) 298 } 299 300 test(s"$version: getPartitions(db: String, table: String)") { 301 assert(testPartitionCount == client.getPartitions("default", "src_part", None).size) 302 } 303 304 test(s"$version: loadPartition") { 305 val partSpec = new java.util.LinkedHashMap[String, String] 306 partSpec.put("key1", "1") 307 partSpec.put("key2", "2") 308 309 client.loadPartition( 310 emptyDir, 311 "default", 312 "src_part", 313 partSpec, 314 replace = false, 315 holdDDLTime = false, 316 inheritTableSpecs = false) 317 } 318 319 test(s"$version: loadDynamicPartitions") { 320 val partSpec = new java.util.LinkedHashMap[String, String] 321 partSpec.put("key1", "1") 322 partSpec.put("key2", "") // Dynamic partition 323 324 client.loadDynamicPartitions( 325 emptyDir, 326 "default", 327 "src_part", 328 partSpec, 329 replace = false, 330 numDP = 1, 331 holdDDLTime = false) 332 } 333 334 test(s"$version: renamePartitions") { 335 val oldSpec = Map("key1" -> "1", "key2" -> "1") 336 val newSpec = Map("key1" -> "1", "key2" -> "3") 337 client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec)) 338 339 // Checks the existence of the new partition (key1 = 1, key2 = 3) 340 assert(client.getPartitionOption("default", "src_part", newSpec).isDefined) 341 } 342 343 test(s"$version: alterPartitions") { 344 val spec = Map("key1" -> "1", "key2" -> "2") 345 val newLocation = Utils.createTempDir().getPath() 346 val storage = storageFormat.copy( 347 locationUri = Some(newLocation), 348 // needed for 0.12 alter partitions 349 serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) 350 val partition = CatalogTablePartition(spec, storage) 351 client.alterPartitions("default", "src_part", Seq(partition)) 352 assert(client.getPartition("default", "src_part", spec) 353 .storage.locationUri == Some(newLocation)) 354 } 355 356 test(s"$version: dropPartitions") { 357 val spec = Map("key1" -> "1", "key2" -> "3") 358 val versionsWithoutPurge = versions.takeWhile(_ != "1.2") 359 // Similar to dropTable; try with purge set, and if it fails, make sure we're running 360 // with a version that is older than the minimum (1.2 in this case). 361 try { 362 client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, 363 purge = true, retainData = false) 364 assert(!versionsWithoutPurge.contains(version)) 365 } catch { 366 case _: UnsupportedOperationException => 367 assert(versionsWithoutPurge.contains(version)) 368 client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, 369 purge = false, retainData = false) 370 } 371 372 assert(client.getPartitionOption("default", "src_part", spec).isEmpty) 373 } 374 375 /////////////////////////////////////////////////////////////////////////// 376 // Function related API 377 /////////////////////////////////////////////////////////////////////////// 378 379 def function(name: String, className: String): CatalogFunction = { 380 CatalogFunction( 381 FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource]) 382 } 383 384 test(s"$version: createFunction") { 385 val functionClass = "org.apache.spark.MyFunc1" 386 if (version == "0.12") { 387 // Hive 0.12 doesn't support creating permanent functions 388 intercept[AnalysisException] { 389 client.createFunction("default", function("func1", functionClass)) 390 } 391 } else { 392 client.createFunction("default", function("func1", functionClass)) 393 } 394 } 395 396 test(s"$version: functionExists") { 397 if (version == "0.12") { 398 // Hive 0.12 doesn't allow customized permanent functions 399 assert(client.functionExists("default", "func1") == false) 400 } else { 401 assert(client.functionExists("default", "func1") == true) 402 } 403 } 404 405 test(s"$version: renameFunction") { 406 if (version == "0.12") { 407 // Hive 0.12 doesn't allow customized permanent functions 408 intercept[NoSuchPermanentFunctionException] { 409 client.renameFunction("default", "func1", "func2") 410 } 411 } else { 412 client.renameFunction("default", "func1", "func2") 413 assert(client.functionExists("default", "func2") == true) 414 } 415 } 416 417 test(s"$version: alterFunction") { 418 val functionClass = "org.apache.spark.MyFunc2" 419 if (version == "0.12") { 420 // Hive 0.12 doesn't allow customized permanent functions 421 intercept[NoSuchPermanentFunctionException] { 422 client.alterFunction("default", function("func2", functionClass)) 423 } 424 } else { 425 client.alterFunction("default", function("func2", functionClass)) 426 } 427 } 428 429 test(s"$version: getFunction") { 430 if (version == "0.12") { 431 // Hive 0.12 doesn't allow customized permanent functions 432 intercept[NoSuchPermanentFunctionException] { 433 client.getFunction("default", "func2") 434 } 435 } else { 436 // No exception should be thrown 437 val func = client.getFunction("default", "func2") 438 assert(func.className == "org.apache.spark.MyFunc2") 439 } 440 } 441 442 test(s"$version: getFunctionOption") { 443 if (version == "0.12") { 444 // Hive 0.12 doesn't allow customized permanent functions 445 assert(client.getFunctionOption("default", "func2").isEmpty) 446 } else { 447 assert(client.getFunctionOption("default", "func2").isDefined) 448 assert(client.getFunctionOption("default", "the_func_not_exists").isEmpty) 449 } 450 } 451 452 test(s"$version: listFunctions") { 453 if (version == "0.12") { 454 // Hive 0.12 doesn't allow customized permanent functions 455 assert(client.listFunctions("default", "fun.*").isEmpty) 456 } else { 457 assert(client.listFunctions("default", "fun.*").size == 1) 458 } 459 } 460 461 test(s"$version: dropFunction") { 462 if (version == "0.12") { 463 // Hive 0.12 doesn't support creating permanent functions 464 intercept[NoSuchPermanentFunctionException] { 465 client.dropFunction("default", "func2") 466 } 467 } else { 468 // No exception should be thrown 469 client.dropFunction("default", "func2") 470 assert(client.listFunctions("default", "fun.*").size == 0) 471 } 472 } 473 474 /////////////////////////////////////////////////////////////////////////// 475 // SQL related API 476 /////////////////////////////////////////////////////////////////////////// 477 478 test(s"$version: sql set command") { 479 client.runSqlHive("SET spark.sql.test.key=1") 480 } 481 482 test(s"$version: sql create index and reset") { 483 client.runSqlHive("CREATE TABLE indexed_table (key INT)") 484 client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + 485 "as 'COMPACT' WITH DEFERRED REBUILD") 486 } 487 488 /////////////////////////////////////////////////////////////////////////// 489 // Miscellaneous API 490 /////////////////////////////////////////////////////////////////////////// 491 492 test(s"$version: version") { 493 assert(client.version.fullVersion.startsWith(version)) 494 } 495 496 test(s"$version: getConf") { 497 assert("success" === client.getConf("test", null)) 498 } 499 500 test(s"$version: setOut") { 501 client.setOut(new PrintStream(new ByteArrayOutputStream())) 502 } 503 504 test(s"$version: setInfo") { 505 client.setInfo(new PrintStream(new ByteArrayOutputStream())) 506 } 507 508 test(s"$version: setError") { 509 client.setError(new PrintStream(new ByteArrayOutputStream())) 510 } 511 512 test(s"$version: newSession") { 513 val newClient = client.newSession() 514 assert(newClient != null) 515 } 516 517 test(s"$version: withHiveState and addJar") { 518 val newClassPath = "." 519 client.addJar(newClassPath) 520 client.withHiveState { 521 // No exception should be thrown. 522 // withHiveState changes the classloader to MutableURLClassLoader 523 val classLoader = Thread.currentThread().getContextClassLoader 524 .asInstanceOf[MutableURLClassLoader] 525 526 val urls = classLoader.getURLs() 527 urls.contains(new File(newClassPath).toURI.toURL) 528 } 529 } 530 531 test(s"$version: reset") { 532 // Clears all database, tables, functions... 533 client.reset() 534 assert(client.listTables("default").isEmpty) 535 } 536 537 /////////////////////////////////////////////////////////////////////////// 538 // End-To-End tests 539 /////////////////////////////////////////////////////////////////////////// 540 541 test(s"$version: CREATE TABLE AS SELECT") { 542 withTable("tbl") { 543 spark.sql("CREATE TABLE tbl AS SELECT 1 AS a") 544 assert(spark.table("tbl").collect().toSeq == Seq(Row(1))) 545 } 546 } 547 548 test(s"$version: Delete the temporary staging directory and files after each insert") { 549 withTempDir { tmpDir => 550 withTable("tab") { 551 spark.sql( 552 s""" 553 |CREATE TABLE tab(c1 string) 554 |location '${tmpDir.toURI.toString}' 555 """.stripMargin) 556 557 (1 to 3).map { i => 558 spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") 559 } 560 def listFiles(path: File): List[String] = { 561 val dir = path.listFiles() 562 val folders = dir.filter(_.isDirectory).toList 563 val filePaths = dir.map(_.getName).toList 564 folders.flatMap(listFiles) ++: filePaths 565 } 566 val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil 567 assert(listFiles(tmpDir).sorted == expectedFiles) 568 } 569 } 570 } 571 572 // TODO: add more tests. 573 } 574} 575