1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.hive 19 20import java.io.File 21 22import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode} 23import org.apache.spark.sql.catalyst.analysis.NoSuchTableException 24import org.apache.spark.sql.catalyst.parser.ParseException 25import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec 26import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation} 27import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat 28import org.apache.spark.sql.hive.test.TestHiveSingleton 29import org.apache.spark.sql.test.SQLTestUtils 30import org.apache.spark.sql.types.StructType 31import org.apache.spark.storage.RDDBlockId 32import org.apache.spark.util.Utils 33 34class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { 35 import hiveContext._ 36 37 def rddIdOf(tableName: String): Int = { 38 val plan = table(tableName).queryExecution.sparkPlan 39 plan.collect { 40 case InMemoryTableScanExec(_, _, relation) => 41 relation.cachedColumnBuffers.id 42 case _ => 43 fail(s"Table $tableName is not cached\n" + plan) 44 }.head 45 } 46 47 def isMaterialized(rddId: Int): Boolean = { 48 val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)) 49 maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0))) 50 maybeBlock.nonEmpty 51 } 52 53 test("cache table") { 54 val preCacheResults = sql("SELECT * FROM src").collect().toSeq 55 56 cacheTable("src") 57 assertCached(sql("SELECT * FROM src")) 58 59 checkAnswer( 60 sql("SELECT * FROM src"), 61 preCacheResults) 62 63 assertCached(sql("SELECT * FROM src s")) 64 65 checkAnswer( 66 sql("SELECT * FROM src s"), 67 preCacheResults) 68 69 uncacheTable("src") 70 assertCached(sql("SELECT * FROM src"), 0) 71 } 72 73 test("cache invalidation") { 74 sql("CREATE TABLE cachedTable(key INT, value STRING)") 75 76 sql("INSERT INTO TABLE cachedTable SELECT * FROM src") 77 checkAnswer(sql("SELECT * FROM cachedTable"), table("src").collect().toSeq) 78 79 cacheTable("cachedTable") 80 checkAnswer(sql("SELECT * FROM cachedTable"), table("src").collect().toSeq) 81 82 sql("INSERT INTO TABLE cachedTable SELECT * FROM src") 83 checkAnswer( 84 sql("SELECT * FROM cachedTable"), 85 table("src").collect().toSeq ++ table("src").collect().toSeq) 86 87 sql("DROP TABLE cachedTable") 88 } 89 90 test("Drop cached table") { 91 sql("CREATE TABLE cachedTableTest(a INT)") 92 cacheTable("cachedTableTest") 93 sql("SELECT * FROM cachedTableTest").collect() 94 sql("DROP TABLE cachedTableTest") 95 intercept[AnalysisException] { 96 sql("SELECT * FROM cachedTableTest").collect() 97 } 98 } 99 100 test("DROP nonexistant table") { 101 sql("DROP TABLE IF EXISTS nonexistantTable") 102 } 103 104 test("uncache of nonexistant tables") { 105 // make sure table doesn't exist 106 intercept[NoSuchTableException](spark.table("nonexistantTable")) 107 intercept[NoSuchTableException] { 108 spark.catalog.uncacheTable("nonexistantTable") 109 } 110 intercept[NoSuchTableException] { 111 sql("UNCACHE TABLE nonexistantTable") 112 } 113 sql("UNCACHE TABLE IF EXISTS nonexistantTable") 114 } 115 116 test("no error on uncache of non-cached table") { 117 val tableName = "newTable" 118 withTable(tableName) { 119 sql(s"CREATE TABLE $tableName(a INT)") 120 // no error will be reported in the following three ways to uncache a table. 121 spark.catalog.uncacheTable(tableName) 122 sql("UNCACHE TABLE newTable") 123 sparkSession.table(tableName).unpersist() 124 } 125 } 126 127 test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") { 128 sql("CACHE TABLE src") 129 assertCached(table("src")) 130 assert(spark.catalog.isCached("src"), "Table 'src' should be cached") 131 132 sql("UNCACHE TABLE src") 133 assertCached(table("src"), 0) 134 assert(!spark.catalog.isCached("src"), "Table 'src' should not be cached") 135 } 136 137 test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { 138 withTempView("testCacheTable") { 139 sql("CACHE TABLE testCacheTable AS SELECT * FROM src") 140 assertCached(table("testCacheTable")) 141 142 val rddId = rddIdOf("testCacheTable") 143 assert( 144 isMaterialized(rddId), 145 "Eagerly cached in-memory table should have already been materialized") 146 147 uncacheTable("testCacheTable") 148 assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") 149 } 150 } 151 152 test("CACHE TABLE tableName AS SELECT ...") { 153 withTempView("testCacheTable") { 154 sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") 155 assertCached(table("testCacheTable")) 156 157 val rddId = rddIdOf("testCacheTable") 158 assert( 159 isMaterialized(rddId), 160 "Eagerly cached in-memory table should have already been materialized") 161 162 uncacheTable("testCacheTable") 163 assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") 164 } 165 } 166 167 test("CACHE LAZY TABLE tableName") { 168 sql("CACHE LAZY TABLE src") 169 assertCached(table("src")) 170 171 val rddId = rddIdOf("src") 172 assert( 173 !isMaterialized(rddId), 174 "Lazily cached in-memory table shouldn't be materialized eagerly") 175 176 sql("SELECT COUNT(*) FROM src").collect() 177 assert( 178 isMaterialized(rddId), 179 "Lazily cached in-memory table should have been materialized") 180 181 uncacheTable("src") 182 assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") 183 } 184 185 test("CACHE TABLE with Hive UDF") { 186 withTempView("udfTest") { 187 sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1") 188 assertCached(table("udfTest")) 189 uncacheTable("udfTest") 190 } 191 } 192 193 test("REFRESH TABLE also needs to recache the data (data source tables)") { 194 val tempPath: File = Utils.createTempDir() 195 tempPath.delete() 196 table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) 197 sql("DROP TABLE IF EXISTS refreshTable") 198 sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") 199 checkAnswer(table("refreshTable"), table("src")) 200 // Cache the table. 201 sql("CACHE TABLE refreshTable") 202 assertCached(table("refreshTable")) 203 // Append new data. 204 table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) 205 // We are still using the old data. 206 assertCached(table("refreshTable")) 207 checkAnswer( 208 table("refreshTable"), 209 table("src").collect()) 210 // Refresh the table. 211 sql("REFRESH TABLE refreshTable") 212 // We are using the new data. 213 assertCached(table("refreshTable")) 214 checkAnswer( 215 table("refreshTable"), 216 table("src").union(table("src")).collect()) 217 218 // Drop the table and create it again. 219 sql("DROP TABLE refreshTable") 220 sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") 221 // It is not cached. 222 assert(!isCached("refreshTable"), "refreshTable should not be cached.") 223 // Refresh the table. REFRESH TABLE command should not make a uncached 224 // table cached. 225 sql("REFRESH TABLE refreshTable") 226 checkAnswer( 227 table("refreshTable"), 228 table("src").union(table("src")).collect()) 229 // It is not cached. 230 assert(!isCached("refreshTable"), "refreshTable should not be cached.") 231 232 sql("DROP TABLE refreshTable") 233 Utils.deleteRecursively(tempPath) 234 } 235 236 test("SPARK-15678: REFRESH PATH") { 237 val tempPath: File = Utils.createTempDir() 238 tempPath.delete() 239 table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) 240 sql("DROP TABLE IF EXISTS refreshTable") 241 sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") 242 checkAnswer( 243 table("refreshTable"), 244 table("src").collect()) 245 // Cache the table. 246 sql("CACHE TABLE refreshTable") 247 assertCached(table("refreshTable")) 248 // Append new data. 249 table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) 250 // We are still using the old data. 251 assertCached(table("refreshTable")) 252 checkAnswer( 253 table("refreshTable"), 254 table("src").collect()) 255 // Refresh the table. 256 sql(s"REFRESH ${tempPath.toString}") 257 // We are using the new data. 258 assertCached(table("refreshTable")) 259 checkAnswer( 260 table("refreshTable"), 261 table("src").union(table("src")).collect()) 262 263 // Drop the table and create it again. 264 sql("DROP TABLE refreshTable") 265 sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") 266 // It is not cached. 267 assert(!isCached("refreshTable"), "refreshTable should not be cached.") 268 // Refresh the table. REFRESH command should not make a uncached 269 // table cached. 270 sql(s"REFRESH ${tempPath.toString}") 271 checkAnswer( 272 table("refreshTable"), 273 table("src").union(table("src")).collect()) 274 // It is not cached. 275 assert(!isCached("refreshTable"), "refreshTable should not be cached.") 276 277 sql("DROP TABLE refreshTable") 278 Utils.deleteRecursively(tempPath) 279 } 280 281 test("Cache/Uncache Qualified Tables") { 282 withTempDatabase { db => 283 withTempView("cachedTable") { 284 sql(s"CREATE TABLE $db.cachedTable STORED AS PARQUET AS SELECT 1") 285 sql(s"CACHE TABLE $db.cachedTable") 286 assertCached(spark.table(s"$db.cachedTable")) 287 288 activateDatabase(db) { 289 assertCached(spark.table("cachedTable")) 290 sql("UNCACHE TABLE cachedTable") 291 assert(!spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should not be cached") 292 sql(s"CACHE TABLE cachedTable") 293 assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached") 294 } 295 296 sql(s"UNCACHE TABLE $db.cachedTable") 297 assert(!spark.catalog.isCached(s"$db.cachedTable"), 298 "Table 'cachedTable' should not be cached") 299 } 300 } 301 } 302 303 test("Cache Table As Select - having database name") { 304 withTempDatabase { db => 305 withTempView("cachedTable") { 306 val e = intercept[ParseException] { 307 sql(s"CACHE TABLE $db.cachedTable AS SELECT 1") 308 }.getMessage 309 assert(e.contains("It is not allowed to add database prefix ") && 310 e.contains("to the table name in CACHE TABLE AS SELECT")) 311 } 312 } 313 } 314 315 test("SPARK-11246 cache parquet table") { 316 sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") 317 318 cacheTable("cachedTable") 319 val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan 320 assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 1) 321 322 sql("DROP TABLE cachedTable") 323 } 324 325 test("cache a table using CatalogFileIndex") { 326 withTable("test") { 327 sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet") 328 val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") 329 val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) 330 331 val dataSchema = StructType(tableMeta.schema.filterNot { f => 332 tableMeta.partitionColumnNames.contains(f.name) 333 }) 334 val relation = HadoopFsRelation( 335 location = catalogFileIndex, 336 partitionSchema = tableMeta.partitionSchema, 337 dataSchema = dataSchema, 338 bucketSpec = None, 339 fileFormat = new ParquetFileFormat(), 340 options = Map.empty)(sparkSession = spark) 341 342 val plan = LogicalRelation(relation, catalogTable = Some(tableMeta)) 343 spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan)) 344 345 assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) 346 347 val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0) 348 val sameRelation = HadoopFsRelation( 349 location = sameCatalog, 350 partitionSchema = tableMeta.partitionSchema, 351 dataSchema = dataSchema, 352 bucketSpec = None, 353 fileFormat = new ParquetFileFormat(), 354 options = Map.empty)(sparkSession = spark) 355 val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta)) 356 357 assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined) 358 } 359 } 360} 361