1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.hive
19
20import java.io.File
21import java.util.concurrent.{Executors, TimeUnit}
22
23import scala.util.Random
24
25import org.scalatest.BeforeAndAfterEach
26
27import org.apache.spark.metrics.source.HiveCatalogMetrics
28import org.apache.spark.sql.catalyst.TableIdentifier
29import org.apache.spark.sql.catalyst.catalog._
30import org.apache.spark.sql.execution.datasources.FileStatusCache
31import org.apache.spark.sql.QueryTest
32import org.apache.spark.sql.hive.client.HiveClient
33import org.apache.spark.sql.hive.test.TestHiveSingleton
34import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
35import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _}
36import org.apache.spark.sql.test.SQLTestUtils
37import org.apache.spark.sql.types._
38
39class HiveSchemaInferenceSuite
40  extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
41
42  import HiveSchemaInferenceSuite._
43  import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX
44
45  override def beforeEach(): Unit = {
46    super.beforeEach()
47    FileStatusCache.resetForTesting()
48  }
49
50  override def afterEach(): Unit = {
51    super.afterEach()
52    // spark.sessionState.catalog.tableRelationCache.invalidateAll()
53    FileStatusCache.resetForTesting()
54  }
55
56  private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
57  private val client = externalCatalog.client
58
59  // Return a copy of the given schema with all field names converted to lower case.
60  private def lowerCaseSchema(schema: StructType): StructType = {
61    StructType(schema.map(f => f.copy(name = f.name.toLowerCase)))
62  }
63
64  // Create a Hive external test table containing the given field and partition column names.
65  // Returns a case-sensitive schema for the table.
66  private def setupExternalTable(
67      fileType: String,
68      fields: Seq[String],
69      partitionCols: Seq[String],
70      dir: File): StructType = {
71    // Treat all table fields as bigints...
72    val structFields = fields.map { field =>
73      StructField(
74        name = field,
75        dataType = LongType,
76        nullable = true,
77        metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build())
78    }
79    // and all partition columns as ints
80    val partitionStructFields = partitionCols.map { field =>
81      StructField(
82        // Partition column case isn't preserved
83        name = field.toLowerCase,
84        dataType = IntegerType,
85        nullable = true,
86        metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build())
87    }
88    val schema = StructType(structFields ++ partitionStructFields)
89
90    // Write some test data (partitioned if specified)
91    val writer = spark.range(NUM_RECORDS)
92      .selectExpr((fields ++ partitionCols).map("id as " + _): _*)
93      .write
94      .partitionBy(partitionCols: _*)
95      .mode("overwrite")
96    fileType match {
97      case ORC_FILE_TYPE =>
98       writer.orc(dir.getAbsolutePath)
99      case PARQUET_FILE_TYPE =>
100       writer.parquet(dir.getAbsolutePath)
101    }
102
103    // Create Hive external table with lowercased schema
104    val serde = HiveSerDe.sourceToSerDe(fileType).get
105    client.createTable(
106      CatalogTable(
107        identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)),
108        tableType = CatalogTableType.EXTERNAL,
109        storage = CatalogStorageFormat(
110          locationUri = Option(dir.getAbsolutePath),
111          inputFormat = serde.inputFormat,
112          outputFormat = serde.outputFormat,
113          serde = serde.serde,
114          compressed = false,
115          properties = Map("serialization.format" -> "1")),
116        schema = schema,
117        provider = Option("hive"),
118        partitionColumnNames = partitionCols.map(_.toLowerCase),
119        properties = Map.empty),
120      true)
121
122    // Add partition records (if specified)
123    if (!partitionCols.isEmpty) {
124      spark.catalog.recoverPartitions(TEST_TABLE_NAME)
125    }
126
127    // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false
128    // and that the raw table returned by the Hive client doesn't have any Spark SQL properties
129    // set (table needs to be obtained from client since HiveExternalCatalog filters these
130    // properties out).
131    assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase)
132    val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
133    assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty)
134    schema
135  }
136
137  private def withTestTables(
138    fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = {
139    // Test both a partitioned and unpartitioned Hive table
140    val tableFields = Seq(
141      (Seq("fieldOne"), Seq("partCol1", "partCol2")),
142      (Seq("fieldOne", "fieldTwo"), Seq.empty[String]))
143
144    tableFields.foreach { case (fields, partCols) =>
145      withTempDir { dir =>
146        val schema = setupExternalTable(fileType, fields, partCols, dir)
147        withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) }
148      }
149    }
150  }
151
152  private def withFileTypes(f: (String) => Unit): Unit
153    = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f)
154
155  private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = {
156    withSQLConf(
157      HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
158      SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f)
159  }
160
161  private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key
162
163  private def testFieldQuery(fields: Seq[String]): Unit = {
164    if (!fields.isEmpty) {
165      val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0"
166      assert(spark.sql(query).count == NUM_RECORDS)
167    }
168  }
169
170  private def testTableSchema(expectedSchema: StructType): Unit = {
171    // Spark 2.1 doesn't add metadata for partition columns when the schema isn't read from the
172    // table properties so strip all field metadata before making the comparison.
173    val tableSchema =
174      StructType(spark.table(TEST_TABLE_NAME).schema.map(_.copy(metadata = Metadata.empty)))
175    val expected =
176      StructType(expectedSchema.map(_.copy(metadata = Metadata.empty)))
177    assert(tableSchema == expected)
178  }
179
180  withFileTypes { fileType =>
181    test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") {
182      withInferenceMode(INFER_AND_SAVE) {
183        withTestTables(fileType) { (fields, partCols, schema) =>
184          testFieldQuery(fields)
185          testFieldQuery(partCols)
186          testTableSchema(schema)
187
188          // Verify the catalog table now contains the updated schema and properties
189          val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
190          assert(catalogTable.schemaPreservesCase)
191          assert(catalogTable.schema == schema)
192          assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase))
193        }
194      }
195    }
196  }
197
198  withFileTypes { fileType =>
199    test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") {
200      withInferenceMode(INFER_ONLY) {
201        withTestTables(fileType) { (fields, partCols, schema) =>
202          val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
203          testFieldQuery(fields)
204          testFieldQuery(partCols)
205          testTableSchema(schema)
206          // Catalog table shouldn't be altered
207          assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
208        }
209      }
210    }
211  }
212
213  withFileTypes { fileType =>
214    test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") {
215      withInferenceMode(NEVER_INFER) {
216        withTestTables(fileType) { (fields, partCols, schema) =>
217          val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
218          // Only check the table schema as the test queries will break
219          testTableSchema(lowerCaseSchema(schema))
220          assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
221        }
222      }
223    }
224  }
225
226  test("mergeWithMetastoreSchema() should return expected results") {
227    // Field type conflict resolution
228    assertResult(
229      StructType(Seq(
230        StructField("lowerCase", StringType),
231        StructField("UPPERCase", DoubleType, nullable = false)))) {
232
233      HiveMetastoreCatalog.mergeWithMetastoreSchema(
234        StructType(Seq(
235          StructField("lowercase", StringType),
236          StructField("uppercase", DoubleType, nullable = false))),
237
238        StructType(Seq(
239          StructField("lowerCase", BinaryType),
240          StructField("UPPERCase", IntegerType, nullable = true))))
241    }
242
243    // MetaStore schema is subset of parquet schema
244    assertResult(
245      StructType(Seq(
246        StructField("UPPERCase", DoubleType, nullable = false)))) {
247
248      HiveMetastoreCatalog.mergeWithMetastoreSchema(
249        StructType(Seq(
250          StructField("uppercase", DoubleType, nullable = false))),
251
252        StructType(Seq(
253          StructField("lowerCase", BinaryType),
254          StructField("UPPERCase", IntegerType, nullable = true))))
255    }
256
257    // Metastore schema contains additional non-nullable fields.
258    assert(intercept[Throwable] {
259      HiveMetastoreCatalog.mergeWithMetastoreSchema(
260        StructType(Seq(
261          StructField("uppercase", DoubleType, nullable = false),
262          StructField("lowerCase", BinaryType, nullable = false))),
263
264        StructType(Seq(
265          StructField("UPPERCase", IntegerType, nullable = true))))
266    }.getMessage.contains("Detected conflicting schemas"))
267
268    // Conflicting non-nullable field names
269    intercept[Throwable] {
270      HiveMetastoreCatalog.mergeWithMetastoreSchema(
271        StructType(Seq(StructField("lower", StringType, nullable = false))),
272        StructType(Seq(StructField("lowerCase", BinaryType))))
273    }
274
275    // Check that merging missing nullable fields works as expected.
276    assertResult(
277      StructType(Seq(
278        StructField("firstField", StringType, nullable = true),
279        StructField("secondField", StringType, nullable = true),
280        StructField("thirdfield", StringType, nullable = true)))) {
281      HiveMetastoreCatalog.mergeWithMetastoreSchema(
282        StructType(Seq(
283          StructField("firstfield", StringType, nullable = true),
284          StructField("secondfield", StringType, nullable = true),
285          StructField("thirdfield", StringType, nullable = true))),
286        StructType(Seq(
287          StructField("firstField", StringType, nullable = true),
288          StructField("secondField", StringType, nullable = true))))
289    }
290
291    // Merge should fail if the Metastore contains any additional fields that are not
292    // nullable.
293    assert(intercept[Throwable] {
294      HiveMetastoreCatalog.mergeWithMetastoreSchema(
295        StructType(Seq(
296          StructField("firstfield", StringType, nullable = true),
297          StructField("secondfield", StringType, nullable = true),
298          StructField("thirdfield", StringType, nullable = false))),
299        StructType(Seq(
300          StructField("firstField", StringType, nullable = true),
301          StructField("secondField", StringType, nullable = true))))
302    }.getMessage.contains("Detected conflicting schemas"))
303
304    // Schema merge should maintain metastore order.
305    assertResult(
306      StructType(Seq(
307        StructField("first_field", StringType, nullable = true),
308        StructField("second_field", StringType, nullable = true),
309        StructField("third_field", StringType, nullable = true),
310        StructField("fourth_field", StringType, nullable = true),
311        StructField("fifth_field", StringType, nullable = true)))) {
312      HiveMetastoreCatalog.mergeWithMetastoreSchema(
313        StructType(Seq(
314          StructField("first_field", StringType, nullable = true),
315          StructField("second_field", StringType, nullable = true),
316          StructField("third_field", StringType, nullable = true),
317          StructField("fourth_field", StringType, nullable = true),
318          StructField("fifth_field", StringType, nullable = true))),
319        StructType(Seq(
320          StructField("fifth_field", StringType, nullable = true),
321          StructField("third_field", StringType, nullable = true),
322          StructField("second_field", StringType, nullable = true))))
323    }
324  }
325}
326
327object HiveSchemaInferenceSuite {
328  private val NUM_RECORDS = 10
329  private val DATABASE = "default"
330  private val TEST_TABLE_NAME = "test_table"
331  private val ORC_FILE_TYPE = "orc"
332  private val PARQUET_FILE_TYPE = "parquet"
333}
334