1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.hive.client
19
20import java.io.{ByteArrayOutputStream, File, PrintStream}
21
22import org.apache.hadoop.conf.Configuration
23import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
24import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
25import org.apache.hadoop.mapred.TextInputFormat
26
27import org.apache.spark.SparkFunSuite
28import org.apache.spark.internal.Logging
29import org.apache.spark.sql.{AnalysisException, Row}
30import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
31import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
32import org.apache.spark.sql.catalyst.catalog._
33import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
34import org.apache.spark.sql.catalyst.util.quietly
35import org.apache.spark.sql.hive.HiveUtils
36import org.apache.spark.sql.hive.test.TestHiveSingleton
37import org.apache.spark.sql.test.SQLTestUtils
38import org.apache.spark.sql.types.IntegerType
39import org.apache.spark.sql.types.StructType
40import org.apache.spark.tags.ExtendedHiveTest
41import org.apache.spark.util.{MutableURLClassLoader, Utils}
42
43/**
44 * A simple set of tests that call the methods of a [[HiveClient]], loading different version
45 * of hive from maven central.  These tests are simple in that they are mostly just testing to make
46 * sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality
47 * is not fully tested.
48 */
49@ExtendedHiveTest
50class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
51
52  private val clientBuilder = new HiveClientBuilder
53  import clientBuilder.buildClient
54
55  test("success sanity check") {
56    val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
57    val db = new CatalogDatabase("default", "desc", "loc", Map())
58    badClient.createDatabase(db, ignoreIfExists = true)
59  }
60
61  test("hadoop configuration preserved") {
62    val hadoopConf = new Configuration()
63    hadoopConf.set("test", "success")
64    val client = buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
65    assert("success" === client.getConf("test", null))
66  }
67
68  private def getNestedMessages(e: Throwable): String = {
69    var causes = ""
70    var lastException = e
71    while (lastException != null) {
72      causes += lastException.toString + "\n"
73      lastException = lastException.getCause
74    }
75    causes
76  }
77
78  private val emptyDir = Utils.createTempDir().getCanonicalPath
79
80  // Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally
81  // connecting to an auto-populated, in-process metastore.  Let's make sure we are getting the
82  // versions right by forcing a known compatibility failure.
83  // TODO: currently only works on mysql where we manually create the schema...
84  ignore("failure sanity check") {
85    val e = intercept[Throwable] {
86      val badClient = quietly { buildClient("13", new Configuration()) }
87    }
88    assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
89  }
90
91  private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
92
93  private var client: HiveClient = null
94
95  versions.foreach { version =>
96    test(s"$version: create client") {
97      client = null
98      System.gc() // Hack to avoid SEGV on some JVM versions.
99      val hadoopConf = new Configuration()
100      hadoopConf.set("test", "success")
101      client = buildClient(version, hadoopConf)
102    }
103
104    def table(database: String, tableName: String): CatalogTable = {
105      CatalogTable(
106        identifier = TableIdentifier(tableName, Some(database)),
107        tableType = CatalogTableType.MANAGED,
108        schema = new StructType().add("key", "int"),
109        storage = CatalogStorageFormat(
110          locationUri = None,
111          inputFormat = Some(classOf[TextInputFormat].getName),
112          outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
113          serde = Some(classOf[LazySimpleSerDe].getName()),
114          compressed = false,
115          properties = Map.empty
116        ))
117    }
118
119    ///////////////////////////////////////////////////////////////////////////
120    // Database related API
121    ///////////////////////////////////////////////////////////////////////////
122
123    val tempDatabasePath = Utils.createTempDir().getCanonicalPath
124
125    test(s"$version: createDatabase") {
126      val defaultDB = CatalogDatabase("default", "desc", "loc", Map())
127      client.createDatabase(defaultDB, ignoreIfExists = true)
128      val tempDB = CatalogDatabase(
129        "temporary", description = "test create", tempDatabasePath, Map())
130      client.createDatabase(tempDB, ignoreIfExists = true)
131    }
132
133    test(s"$version: setCurrentDatabase") {
134      client.setCurrentDatabase("default")
135    }
136
137    test(s"$version: getDatabase") {
138      // No exception should be thrown
139      client.getDatabase("default")
140      intercept[NoSuchDatabaseException](client.getDatabase("nonexist"))
141    }
142
143    test(s"$version: databaseExists") {
144      assert(client.databaseExists("default") == true)
145      assert(client.databaseExists("nonexist") == false)
146    }
147
148    test(s"$version: listDatabases") {
149      assert(client.listDatabases("defau.*") == Seq("default"))
150    }
151
152    test(s"$version: alterDatabase") {
153      val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true"))
154      client.alterDatabase(database)
155      assert(client.getDatabase("temporary").properties.contains("flag"))
156    }
157
158    test(s"$version: dropDatabase") {
159      assert(client.databaseExists("temporary") == true)
160      client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true)
161      assert(client.databaseExists("temporary") == false)
162    }
163
164    ///////////////////////////////////////////////////////////////////////////
165    // Table related API
166    ///////////////////////////////////////////////////////////////////////////
167
168    test(s"$version: createTable") {
169      client.createTable(table("default", tableName = "src"), ignoreIfExists = false)
170      client.createTable(table("default", "temporary"), ignoreIfExists = false)
171    }
172
173    test(s"$version: loadTable") {
174      client.loadTable(
175        emptyDir,
176        tableName = "src",
177        replace = false,
178        holdDDLTime = false)
179    }
180
181    test(s"$version: tableExists") {
182      // No exception should be thrown
183      assert(client.tableExists("default", "src"))
184      assert(!client.tableExists("default", "nonexistent"))
185    }
186
187    test(s"$version: getTable") {
188      // No exception should be thrown
189      client.getTable("default", "src")
190    }
191
192    test(s"$version: getTableOption") {
193      assert(client.getTableOption("default", "src").isDefined)
194    }
195
196    test(s"$version: alterTable(table: CatalogTable)") {
197      val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> ""))
198      client.alterTable(newTable)
199      assert(client.getTable("default", "src").properties.contains("changed"))
200    }
201
202    test(s"$version: alterTable(tableName: String, table: CatalogTable)") {
203      val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> ""))
204      client.alterTable("src", newTable)
205      assert(client.getTable("default", "src").properties.contains("changedAgain"))
206    }
207
208    test(s"$version: listTables(database)") {
209      assert(client.listTables("default") === Seq("src", "temporary"))
210    }
211
212    test(s"$version: listTables(database, pattern)") {
213      assert(client.listTables("default", pattern = "src") === Seq("src"))
214      assert(client.listTables("default", pattern = "nonexist").isEmpty)
215    }
216
217    test(s"$version: dropTable") {
218      val versionsWithoutPurge = versions.takeWhile(_ != "0.14")
219      // First try with the purge option set. This should fail if the version is < 0.14, in which
220      // case we check the version and try without it.
221      try {
222        client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
223          purge = true)
224        assert(!versionsWithoutPurge.contains(version))
225      } catch {
226        case _: UnsupportedOperationException =>
227          assert(versionsWithoutPurge.contains(version))
228          client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
229            purge = false)
230      }
231      assert(client.listTables("default") === Seq("src"))
232    }
233
234    ///////////////////////////////////////////////////////////////////////////
235    // Partition related API
236    ///////////////////////////////////////////////////////////////////////////
237
238    val storageFormat = CatalogStorageFormat(
239      locationUri = None,
240      inputFormat = None,
241      outputFormat = None,
242      serde = None,
243      compressed = false,
244      properties = Map.empty)
245
246    test(s"$version: sql create partitioned table") {
247      client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
248    }
249
250    val testPartitionCount = 2
251
252    test(s"$version: createPartitions") {
253      val partitions = (1 to testPartitionCount).map { key2 =>
254        CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
255      }
256      client.createPartitions(
257        "default", "src_part", partitions, ignoreIfExists = true)
258    }
259
260    test(s"$version: getPartitionNames(catalogTable)") {
261      val partitionNames = (1 to testPartitionCount).map(key2 => s"key1=1/key2=$key2")
262      assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part")))
263    }
264
265    test(s"$version: getPartitions(catalogTable)") {
266      assert(testPartitionCount ==
267        client.getPartitions(client.getTable("default", "src_part")).size)
268    }
269
270    test(s"$version: getPartitionsByFilter") {
271      // Only one partition [1, 1] for key2 == 1
272      val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
273        Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
274
275      // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
276      if (version != "0.12") {
277        assert(result.size == 1)
278      } else {
279        assert(result.size == testPartitionCount)
280      }
281    }
282
283    test(s"$version: getPartition") {
284      // No exception should be thrown
285      client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2"))
286    }
287
288    test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") {
289      val partition = client.getPartitionOption(
290        "default", "src_part", Map("key1" -> "1", "key2" -> "2"))
291      assert(partition.isDefined)
292    }
293
294    test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") {
295      val partition = client.getPartitionOption(
296        client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
297      assert(partition.isDefined)
298    }
299
300    test(s"$version: getPartitions(db: String, table: String)") {
301      assert(testPartitionCount == client.getPartitions("default", "src_part", None).size)
302    }
303
304    test(s"$version: loadPartition") {
305      val partSpec = new java.util.LinkedHashMap[String, String]
306      partSpec.put("key1", "1")
307      partSpec.put("key2", "2")
308
309      client.loadPartition(
310        emptyDir,
311        "default",
312        "src_part",
313        partSpec,
314        replace = false,
315        holdDDLTime = false,
316        inheritTableSpecs = false)
317    }
318
319    test(s"$version: loadDynamicPartitions") {
320      val partSpec = new java.util.LinkedHashMap[String, String]
321      partSpec.put("key1", "1")
322      partSpec.put("key2", "") // Dynamic partition
323
324      client.loadDynamicPartitions(
325        emptyDir,
326        "default",
327        "src_part",
328        partSpec,
329        replace = false,
330        numDP = 1,
331        holdDDLTime = false)
332    }
333
334    test(s"$version: renamePartitions") {
335      val oldSpec = Map("key1" -> "1", "key2" -> "1")
336      val newSpec = Map("key1" -> "1", "key2" -> "3")
337      client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec))
338
339      // Checks the existence of the new partition (key1 = 1, key2 = 3)
340      assert(client.getPartitionOption("default", "src_part", newSpec).isDefined)
341    }
342
343    test(s"$version: alterPartitions") {
344      val spec = Map("key1" -> "1", "key2" -> "2")
345      val newLocation = Utils.createTempDir().getPath()
346      val storage = storageFormat.copy(
347        locationUri = Some(newLocation),
348        // needed for 0.12 alter partitions
349        serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
350      val partition = CatalogTablePartition(spec, storage)
351      client.alterPartitions("default", "src_part", Seq(partition))
352      assert(client.getPartition("default", "src_part", spec)
353        .storage.locationUri == Some(newLocation))
354    }
355
356    test(s"$version: dropPartitions") {
357      val spec = Map("key1" -> "1", "key2" -> "3")
358      val versionsWithoutPurge = versions.takeWhile(_ != "1.2")
359      // Similar to dropTable; try with purge set, and if it fails, make sure we're running
360      // with a version that is older than the minimum (1.2 in this case).
361      try {
362        client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
363          purge = true, retainData = false)
364        assert(!versionsWithoutPurge.contains(version))
365      } catch {
366        case _: UnsupportedOperationException =>
367          assert(versionsWithoutPurge.contains(version))
368          client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
369            purge = false, retainData = false)
370      }
371
372      assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
373    }
374
375    ///////////////////////////////////////////////////////////////////////////
376    // Function related API
377    ///////////////////////////////////////////////////////////////////////////
378
379    def function(name: String, className: String): CatalogFunction = {
380      CatalogFunction(
381        FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource])
382    }
383
384    test(s"$version: createFunction") {
385      val functionClass = "org.apache.spark.MyFunc1"
386      if (version == "0.12") {
387        // Hive 0.12 doesn't support creating permanent functions
388        intercept[AnalysisException] {
389          client.createFunction("default", function("func1", functionClass))
390        }
391      } else {
392        client.createFunction("default", function("func1", functionClass))
393      }
394    }
395
396    test(s"$version: functionExists") {
397      if (version == "0.12") {
398        // Hive 0.12 doesn't allow customized permanent functions
399        assert(client.functionExists("default", "func1") == false)
400      } else {
401        assert(client.functionExists("default", "func1") == true)
402      }
403    }
404
405    test(s"$version: renameFunction") {
406      if (version == "0.12") {
407        // Hive 0.12 doesn't allow customized permanent functions
408        intercept[NoSuchPermanentFunctionException] {
409          client.renameFunction("default", "func1", "func2")
410        }
411      } else {
412        client.renameFunction("default", "func1", "func2")
413        assert(client.functionExists("default", "func2") == true)
414      }
415    }
416
417    test(s"$version: alterFunction") {
418      val functionClass = "org.apache.spark.MyFunc2"
419      if (version == "0.12") {
420        // Hive 0.12 doesn't allow customized permanent functions
421        intercept[NoSuchPermanentFunctionException] {
422          client.alterFunction("default", function("func2", functionClass))
423        }
424      } else {
425        client.alterFunction("default", function("func2", functionClass))
426      }
427    }
428
429    test(s"$version: getFunction") {
430      if (version == "0.12") {
431        // Hive 0.12 doesn't allow customized permanent functions
432        intercept[NoSuchPermanentFunctionException] {
433          client.getFunction("default", "func2")
434        }
435      } else {
436        // No exception should be thrown
437        val func = client.getFunction("default", "func2")
438        assert(func.className == "org.apache.spark.MyFunc2")
439      }
440    }
441
442    test(s"$version: getFunctionOption") {
443      if (version == "0.12") {
444        // Hive 0.12 doesn't allow customized permanent functions
445        assert(client.getFunctionOption("default", "func2").isEmpty)
446      } else {
447        assert(client.getFunctionOption("default", "func2").isDefined)
448        assert(client.getFunctionOption("default", "the_func_not_exists").isEmpty)
449      }
450    }
451
452    test(s"$version: listFunctions") {
453      if (version == "0.12") {
454        // Hive 0.12 doesn't allow customized permanent functions
455        assert(client.listFunctions("default", "fun.*").isEmpty)
456      } else {
457        assert(client.listFunctions("default", "fun.*").size == 1)
458      }
459    }
460
461    test(s"$version: dropFunction") {
462      if (version == "0.12") {
463        // Hive 0.12 doesn't support creating permanent functions
464        intercept[NoSuchPermanentFunctionException] {
465          client.dropFunction("default", "func2")
466        }
467      } else {
468        // No exception should be thrown
469        client.dropFunction("default", "func2")
470        assert(client.listFunctions("default", "fun.*").size == 0)
471      }
472    }
473
474    ///////////////////////////////////////////////////////////////////////////
475    // SQL related API
476    ///////////////////////////////////////////////////////////////////////////
477
478    test(s"$version: sql set command") {
479      client.runSqlHive("SET spark.sql.test.key=1")
480    }
481
482    test(s"$version: sql create index and reset") {
483      client.runSqlHive("CREATE TABLE indexed_table (key INT)")
484      client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
485        "as 'COMPACT' WITH DEFERRED REBUILD")
486    }
487
488    ///////////////////////////////////////////////////////////////////////////
489    // Miscellaneous API
490    ///////////////////////////////////////////////////////////////////////////
491
492    test(s"$version: version") {
493      assert(client.version.fullVersion.startsWith(version))
494    }
495
496    test(s"$version: getConf") {
497      assert("success" === client.getConf("test", null))
498    }
499
500    test(s"$version: setOut") {
501      client.setOut(new PrintStream(new ByteArrayOutputStream()))
502    }
503
504    test(s"$version: setInfo") {
505      client.setInfo(new PrintStream(new ByteArrayOutputStream()))
506    }
507
508    test(s"$version: setError") {
509      client.setError(new PrintStream(new ByteArrayOutputStream()))
510    }
511
512    test(s"$version: newSession") {
513      val newClient = client.newSession()
514      assert(newClient != null)
515    }
516
517    test(s"$version: withHiveState and addJar") {
518      val newClassPath = "."
519      client.addJar(newClassPath)
520      client.withHiveState {
521        // No exception should be thrown.
522        // withHiveState changes the classloader to MutableURLClassLoader
523        val classLoader = Thread.currentThread().getContextClassLoader
524          .asInstanceOf[MutableURLClassLoader]
525
526        val urls = classLoader.getURLs()
527        urls.contains(new File(newClassPath).toURI.toURL)
528      }
529    }
530
531    test(s"$version: reset") {
532      // Clears all database, tables, functions...
533      client.reset()
534      assert(client.listTables("default").isEmpty)
535    }
536
537    ///////////////////////////////////////////////////////////////////////////
538    // End-To-End tests
539    ///////////////////////////////////////////////////////////////////////////
540
541    test(s"$version: CREATE TABLE AS SELECT") {
542      withTable("tbl") {
543        spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
544        assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
545      }
546    }
547
548    test(s"$version: Delete the temporary staging directory and files after each insert") {
549      withTempDir { tmpDir =>
550        withTable("tab") {
551          spark.sql(
552            s"""
553               |CREATE TABLE tab(c1 string)
554               |location '${tmpDir.toURI.toString}'
555             """.stripMargin)
556
557          (1 to 3).map { i =>
558            spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
559          }
560          def listFiles(path: File): List[String] = {
561            val dir = path.listFiles()
562            val folders = dir.filter(_.isDirectory).toList
563            val filePaths = dir.map(_.getName).toList
564            folders.flatMap(listFiles) ++: filePaths
565          }
566          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
567          assert(listFiles(tmpDir).sorted == expectedFiles)
568        }
569      }
570    }
571
572    // TODO: add more tests.
573  }
574}
575