1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.hive 19 20import java.io.File 21import java.util.concurrent.{Executors, TimeUnit} 22 23import scala.util.Random 24 25import org.scalatest.BeforeAndAfterEach 26 27import org.apache.spark.metrics.source.HiveCatalogMetrics 28import org.apache.spark.sql.catalyst.TableIdentifier 29import org.apache.spark.sql.catalyst.catalog._ 30import org.apache.spark.sql.execution.datasources.FileStatusCache 31import org.apache.spark.sql.QueryTest 32import org.apache.spark.sql.hive.client.HiveClient 33import org.apache.spark.sql.hive.test.TestHiveSingleton 34import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} 35import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _} 36import org.apache.spark.sql.test.SQLTestUtils 37import org.apache.spark.sql.types._ 38 39class HiveSchemaInferenceSuite 40 extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { 41 42 import HiveSchemaInferenceSuite._ 43 import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX 44 45 override def beforeEach(): Unit = { 46 super.beforeEach() 47 FileStatusCache.resetForTesting() 48 } 49 50 override def afterEach(): Unit = { 51 super.afterEach() 52 // spark.sessionState.catalog.tableRelationCache.invalidateAll() 53 FileStatusCache.resetForTesting() 54 } 55 56 private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] 57 private val client = externalCatalog.client 58 59 // Return a copy of the given schema with all field names converted to lower case. 60 private def lowerCaseSchema(schema: StructType): StructType = { 61 StructType(schema.map(f => f.copy(name = f.name.toLowerCase))) 62 } 63 64 // Create a Hive external test table containing the given field and partition column names. 65 // Returns a case-sensitive schema for the table. 66 private def setupExternalTable( 67 fileType: String, 68 fields: Seq[String], 69 partitionCols: Seq[String], 70 dir: File): StructType = { 71 // Treat all table fields as bigints... 72 val structFields = fields.map { field => 73 StructField( 74 name = field, 75 dataType = LongType, 76 nullable = true, 77 metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build()) 78 } 79 // and all partition columns as ints 80 val partitionStructFields = partitionCols.map { field => 81 StructField( 82 // Partition column case isn't preserved 83 name = field.toLowerCase, 84 dataType = IntegerType, 85 nullable = true, 86 metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build()) 87 } 88 val schema = StructType(structFields ++ partitionStructFields) 89 90 // Write some test data (partitioned if specified) 91 val writer = spark.range(NUM_RECORDS) 92 .selectExpr((fields ++ partitionCols).map("id as " + _): _*) 93 .write 94 .partitionBy(partitionCols: _*) 95 .mode("overwrite") 96 fileType match { 97 case ORC_FILE_TYPE => 98 writer.orc(dir.getAbsolutePath) 99 case PARQUET_FILE_TYPE => 100 writer.parquet(dir.getAbsolutePath) 101 } 102 103 // Create Hive external table with lowercased schema 104 val serde = HiveSerDe.sourceToSerDe(fileType).get 105 client.createTable( 106 CatalogTable( 107 identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)), 108 tableType = CatalogTableType.EXTERNAL, 109 storage = CatalogStorageFormat( 110 locationUri = Option(dir.getAbsolutePath), 111 inputFormat = serde.inputFormat, 112 outputFormat = serde.outputFormat, 113 serde = serde.serde, 114 compressed = false, 115 properties = Map("serialization.format" -> "1")), 116 schema = schema, 117 provider = Option("hive"), 118 partitionColumnNames = partitionCols.map(_.toLowerCase), 119 properties = Map.empty), 120 true) 121 122 // Add partition records (if specified) 123 if (!partitionCols.isEmpty) { 124 spark.catalog.recoverPartitions(TEST_TABLE_NAME) 125 } 126 127 // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false 128 // and that the raw table returned by the Hive client doesn't have any Spark SQL properties 129 // set (table needs to be obtained from client since HiveExternalCatalog filters these 130 // properties out). 131 assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase) 132 val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME) 133 assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty) 134 schema 135 } 136 137 private def withTestTables( 138 fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = { 139 // Test both a partitioned and unpartitioned Hive table 140 val tableFields = Seq( 141 (Seq("fieldOne"), Seq("partCol1", "partCol2")), 142 (Seq("fieldOne", "fieldTwo"), Seq.empty[String])) 143 144 tableFields.foreach { case (fields, partCols) => 145 withTempDir { dir => 146 val schema = setupExternalTable(fileType, fields, partCols, dir) 147 withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) } 148 } 149 } 150 } 151 152 private def withFileTypes(f: (String) => Unit): Unit 153 = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f) 154 155 private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = { 156 withSQLConf( 157 HiveUtils.CONVERT_METASTORE_ORC.key -> "true", 158 SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f) 159 } 160 161 private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key 162 163 private def testFieldQuery(fields: Seq[String]): Unit = { 164 if (!fields.isEmpty) { 165 val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0" 166 assert(spark.sql(query).count == NUM_RECORDS) 167 } 168 } 169 170 private def testTableSchema(expectedSchema: StructType): Unit = { 171 // Spark 2.1 doesn't add metadata for partition columns when the schema isn't read from the 172 // table properties so strip all field metadata before making the comparison. 173 val tableSchema = 174 StructType(spark.table(TEST_TABLE_NAME).schema.map(_.copy(metadata = Metadata.empty))) 175 val expected = 176 StructType(expectedSchema.map(_.copy(metadata = Metadata.empty))) 177 assert(tableSchema == expected) 178 } 179 180 withFileTypes { fileType => 181 test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") { 182 withInferenceMode(INFER_AND_SAVE) { 183 withTestTables(fileType) { (fields, partCols, schema) => 184 testFieldQuery(fields) 185 testFieldQuery(partCols) 186 testTableSchema(schema) 187 188 // Verify the catalog table now contains the updated schema and properties 189 val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) 190 assert(catalogTable.schemaPreservesCase) 191 assert(catalogTable.schema == schema) 192 assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase)) 193 } 194 } 195 } 196 } 197 198 withFileTypes { fileType => 199 test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") { 200 withInferenceMode(INFER_ONLY) { 201 withTestTables(fileType) { (fields, partCols, schema) => 202 val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) 203 testFieldQuery(fields) 204 testFieldQuery(partCols) 205 testTableSchema(schema) 206 // Catalog table shouldn't be altered 207 assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable) 208 } 209 } 210 } 211 } 212 213 withFileTypes { fileType => 214 test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") { 215 withInferenceMode(NEVER_INFER) { 216 withTestTables(fileType) { (fields, partCols, schema) => 217 val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) 218 // Only check the table schema as the test queries will break 219 testTableSchema(lowerCaseSchema(schema)) 220 assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable) 221 } 222 } 223 } 224 } 225 226 test("mergeWithMetastoreSchema() should return expected results") { 227 // Field type conflict resolution 228 assertResult( 229 StructType(Seq( 230 StructField("lowerCase", StringType), 231 StructField("UPPERCase", DoubleType, nullable = false)))) { 232 233 HiveMetastoreCatalog.mergeWithMetastoreSchema( 234 StructType(Seq( 235 StructField("lowercase", StringType), 236 StructField("uppercase", DoubleType, nullable = false))), 237 238 StructType(Seq( 239 StructField("lowerCase", BinaryType), 240 StructField("UPPERCase", IntegerType, nullable = true)))) 241 } 242 243 // MetaStore schema is subset of parquet schema 244 assertResult( 245 StructType(Seq( 246 StructField("UPPERCase", DoubleType, nullable = false)))) { 247 248 HiveMetastoreCatalog.mergeWithMetastoreSchema( 249 StructType(Seq( 250 StructField("uppercase", DoubleType, nullable = false))), 251 252 StructType(Seq( 253 StructField("lowerCase", BinaryType), 254 StructField("UPPERCase", IntegerType, nullable = true)))) 255 } 256 257 // Metastore schema contains additional non-nullable fields. 258 assert(intercept[Throwable] { 259 HiveMetastoreCatalog.mergeWithMetastoreSchema( 260 StructType(Seq( 261 StructField("uppercase", DoubleType, nullable = false), 262 StructField("lowerCase", BinaryType, nullable = false))), 263 264 StructType(Seq( 265 StructField("UPPERCase", IntegerType, nullable = true)))) 266 }.getMessage.contains("Detected conflicting schemas")) 267 268 // Conflicting non-nullable field names 269 intercept[Throwable] { 270 HiveMetastoreCatalog.mergeWithMetastoreSchema( 271 StructType(Seq(StructField("lower", StringType, nullable = false))), 272 StructType(Seq(StructField("lowerCase", BinaryType)))) 273 } 274 275 // Check that merging missing nullable fields works as expected. 276 assertResult( 277 StructType(Seq( 278 StructField("firstField", StringType, nullable = true), 279 StructField("secondField", StringType, nullable = true), 280 StructField("thirdfield", StringType, nullable = true)))) { 281 HiveMetastoreCatalog.mergeWithMetastoreSchema( 282 StructType(Seq( 283 StructField("firstfield", StringType, nullable = true), 284 StructField("secondfield", StringType, nullable = true), 285 StructField("thirdfield", StringType, nullable = true))), 286 StructType(Seq( 287 StructField("firstField", StringType, nullable = true), 288 StructField("secondField", StringType, nullable = true)))) 289 } 290 291 // Merge should fail if the Metastore contains any additional fields that are not 292 // nullable. 293 assert(intercept[Throwable] { 294 HiveMetastoreCatalog.mergeWithMetastoreSchema( 295 StructType(Seq( 296 StructField("firstfield", StringType, nullable = true), 297 StructField("secondfield", StringType, nullable = true), 298 StructField("thirdfield", StringType, nullable = false))), 299 StructType(Seq( 300 StructField("firstField", StringType, nullable = true), 301 StructField("secondField", StringType, nullable = true)))) 302 }.getMessage.contains("Detected conflicting schemas")) 303 304 // Schema merge should maintain metastore order. 305 assertResult( 306 StructType(Seq( 307 StructField("first_field", StringType, nullable = true), 308 StructField("second_field", StringType, nullable = true), 309 StructField("third_field", StringType, nullable = true), 310 StructField("fourth_field", StringType, nullable = true), 311 StructField("fifth_field", StringType, nullable = true)))) { 312 HiveMetastoreCatalog.mergeWithMetastoreSchema( 313 StructType(Seq( 314 StructField("first_field", StringType, nullable = true), 315 StructField("second_field", StringType, nullable = true), 316 StructField("third_field", StringType, nullable = true), 317 StructField("fourth_field", StringType, nullable = true), 318 StructField("fifth_field", StringType, nullable = true))), 319 StructType(Seq( 320 StructField("fifth_field", StringType, nullable = true), 321 StructField("third_field", StringType, nullable = true), 322 StructField("second_field", StringType, nullable = true)))) 323 } 324 } 325} 326 327object HiveSchemaInferenceSuite { 328 private val NUM_RECORDS = 10 329 private val DATABASE = "default" 330 private val TEST_TABLE_NAME = "test_table" 331 private val ORC_FILE_TYPE = "orc" 332 private val PARQUET_FILE_TYPE = "parquet" 333} 334