1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.hive
19
20import java.io.File
21
22import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
23import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
24import org.apache.spark.sql.catalyst.parser.ParseException
25import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
26import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
27import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
28import org.apache.spark.sql.hive.test.TestHiveSingleton
29import org.apache.spark.sql.test.SQLTestUtils
30import org.apache.spark.sql.types.StructType
31import org.apache.spark.storage.RDDBlockId
32import org.apache.spark.util.Utils
33
34class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
35  import hiveContext._
36
37  def rddIdOf(tableName: String): Int = {
38    val plan = table(tableName).queryExecution.sparkPlan
39    plan.collect {
40      case InMemoryTableScanExec(_, _, relation) =>
41        relation.cachedColumnBuffers.id
42      case _ =>
43        fail(s"Table $tableName is not cached\n" + plan)
44    }.head
45  }
46
47  def isMaterialized(rddId: Int): Boolean = {
48    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
49    maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
50    maybeBlock.nonEmpty
51  }
52
53  test("cache table") {
54    val preCacheResults = sql("SELECT * FROM src").collect().toSeq
55
56    cacheTable("src")
57    assertCached(sql("SELECT * FROM src"))
58
59    checkAnswer(
60      sql("SELECT * FROM src"),
61      preCacheResults)
62
63    assertCached(sql("SELECT * FROM src s"))
64
65    checkAnswer(
66      sql("SELECT * FROM src s"),
67      preCacheResults)
68
69    uncacheTable("src")
70    assertCached(sql("SELECT * FROM src"), 0)
71  }
72
73  test("cache invalidation") {
74    sql("CREATE TABLE cachedTable(key INT, value STRING)")
75
76    sql("INSERT INTO TABLE cachedTable SELECT * FROM src")
77    checkAnswer(sql("SELECT * FROM cachedTable"), table("src").collect().toSeq)
78
79    cacheTable("cachedTable")
80    checkAnswer(sql("SELECT * FROM cachedTable"), table("src").collect().toSeq)
81
82    sql("INSERT INTO TABLE cachedTable SELECT * FROM src")
83    checkAnswer(
84      sql("SELECT * FROM cachedTable"),
85      table("src").collect().toSeq ++ table("src").collect().toSeq)
86
87    sql("DROP TABLE cachedTable")
88  }
89
90  test("Drop cached table") {
91    sql("CREATE TABLE cachedTableTest(a INT)")
92    cacheTable("cachedTableTest")
93    sql("SELECT * FROM cachedTableTest").collect()
94    sql("DROP TABLE cachedTableTest")
95    intercept[AnalysisException] {
96      sql("SELECT * FROM cachedTableTest").collect()
97    }
98  }
99
100  test("DROP nonexistant table") {
101    sql("DROP TABLE IF EXISTS nonexistantTable")
102  }
103
104  test("uncache of nonexistant tables") {
105    // make sure table doesn't exist
106    intercept[NoSuchTableException](spark.table("nonexistantTable"))
107    intercept[NoSuchTableException] {
108      spark.catalog.uncacheTable("nonexistantTable")
109    }
110    intercept[NoSuchTableException] {
111      sql("UNCACHE TABLE nonexistantTable")
112    }
113    sql("UNCACHE TABLE IF EXISTS nonexistantTable")
114  }
115
116  test("no error on uncache of non-cached table") {
117    val tableName = "newTable"
118    withTable(tableName) {
119      sql(s"CREATE TABLE $tableName(a INT)")
120      // no error will be reported in the following three ways to uncache a table.
121      spark.catalog.uncacheTable(tableName)
122      sql("UNCACHE TABLE newTable")
123      sparkSession.table(tableName).unpersist()
124    }
125  }
126
127  test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
128    sql("CACHE TABLE src")
129    assertCached(table("src"))
130    assert(spark.catalog.isCached("src"), "Table 'src' should be cached")
131
132    sql("UNCACHE TABLE src")
133    assertCached(table("src"), 0)
134    assert(!spark.catalog.isCached("src"), "Table 'src' should not be cached")
135  }
136
137  test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
138    withTempView("testCacheTable") {
139      sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
140      assertCached(table("testCacheTable"))
141
142      val rddId = rddIdOf("testCacheTable")
143      assert(
144        isMaterialized(rddId),
145        "Eagerly cached in-memory table should have already been materialized")
146
147      uncacheTable("testCacheTable")
148      assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
149    }
150  }
151
152  test("CACHE TABLE tableName AS SELECT ...") {
153    withTempView("testCacheTable") {
154      sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
155      assertCached(table("testCacheTable"))
156
157      val rddId = rddIdOf("testCacheTable")
158      assert(
159        isMaterialized(rddId),
160        "Eagerly cached in-memory table should have already been materialized")
161
162      uncacheTable("testCacheTable")
163      assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
164    }
165  }
166
167  test("CACHE LAZY TABLE tableName") {
168    sql("CACHE LAZY TABLE src")
169    assertCached(table("src"))
170
171    val rddId = rddIdOf("src")
172    assert(
173      !isMaterialized(rddId),
174      "Lazily cached in-memory table shouldn't be materialized eagerly")
175
176    sql("SELECT COUNT(*) FROM src").collect()
177    assert(
178      isMaterialized(rddId),
179      "Lazily cached in-memory table should have been materialized")
180
181    uncacheTable("src")
182    assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
183  }
184
185  test("CACHE TABLE with Hive UDF") {
186    withTempView("udfTest") {
187      sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1")
188      assertCached(table("udfTest"))
189      uncacheTable("udfTest")
190    }
191  }
192
193  test("REFRESH TABLE also needs to recache the data (data source tables)") {
194    val tempPath: File = Utils.createTempDir()
195    tempPath.delete()
196    table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
197    sql("DROP TABLE IF EXISTS refreshTable")
198    sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
199    checkAnswer(table("refreshTable"), table("src"))
200    // Cache the table.
201    sql("CACHE TABLE refreshTable")
202    assertCached(table("refreshTable"))
203    // Append new data.
204    table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
205    // We are still using the old data.
206    assertCached(table("refreshTable"))
207    checkAnswer(
208      table("refreshTable"),
209      table("src").collect())
210    // Refresh the table.
211    sql("REFRESH TABLE refreshTable")
212    // We are using the new data.
213    assertCached(table("refreshTable"))
214    checkAnswer(
215      table("refreshTable"),
216      table("src").union(table("src")).collect())
217
218    // Drop the table and create it again.
219    sql("DROP TABLE refreshTable")
220    sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
221    // It is not cached.
222    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
223    // Refresh the table. REFRESH TABLE command should not make a uncached
224    // table cached.
225    sql("REFRESH TABLE refreshTable")
226    checkAnswer(
227      table("refreshTable"),
228      table("src").union(table("src")).collect())
229    // It is not cached.
230    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
231
232    sql("DROP TABLE refreshTable")
233    Utils.deleteRecursively(tempPath)
234  }
235
236  test("SPARK-15678: REFRESH PATH") {
237    val tempPath: File = Utils.createTempDir()
238    tempPath.delete()
239    table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
240    sql("DROP TABLE IF EXISTS refreshTable")
241    sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
242    checkAnswer(
243      table("refreshTable"),
244      table("src").collect())
245    // Cache the table.
246    sql("CACHE TABLE refreshTable")
247    assertCached(table("refreshTable"))
248    // Append new data.
249    table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
250    // We are still using the old data.
251    assertCached(table("refreshTable"))
252    checkAnswer(
253      table("refreshTable"),
254      table("src").collect())
255    // Refresh the table.
256    sql(s"REFRESH ${tempPath.toString}")
257    // We are using the new data.
258    assertCached(table("refreshTable"))
259    checkAnswer(
260      table("refreshTable"),
261      table("src").union(table("src")).collect())
262
263    // Drop the table and create it again.
264    sql("DROP TABLE refreshTable")
265    sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
266    // It is not cached.
267    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
268    // Refresh the table. REFRESH command should not make a uncached
269    // table cached.
270    sql(s"REFRESH ${tempPath.toString}")
271    checkAnswer(
272      table("refreshTable"),
273      table("src").union(table("src")).collect())
274    // It is not cached.
275    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
276
277    sql("DROP TABLE refreshTable")
278    Utils.deleteRecursively(tempPath)
279  }
280
281  test("Cache/Uncache Qualified Tables") {
282    withTempDatabase { db =>
283      withTempView("cachedTable") {
284        sql(s"CREATE TABLE $db.cachedTable STORED AS PARQUET AS SELECT 1")
285        sql(s"CACHE TABLE $db.cachedTable")
286        assertCached(spark.table(s"$db.cachedTable"))
287
288        activateDatabase(db) {
289          assertCached(spark.table("cachedTable"))
290          sql("UNCACHE TABLE cachedTable")
291          assert(!spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should not be cached")
292          sql(s"CACHE TABLE cachedTable")
293          assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached")
294        }
295
296        sql(s"UNCACHE TABLE $db.cachedTable")
297        assert(!spark.catalog.isCached(s"$db.cachedTable"),
298          "Table 'cachedTable' should not be cached")
299      }
300    }
301  }
302
303  test("Cache Table As Select - having database name") {
304    withTempDatabase { db =>
305      withTempView("cachedTable") {
306        val e = intercept[ParseException] {
307          sql(s"CACHE TABLE $db.cachedTable AS SELECT 1")
308        }.getMessage
309        assert(e.contains("It is not allowed to add database prefix ") &&
310          e.contains("to the table name in CACHE TABLE AS SELECT"))
311      }
312    }
313  }
314
315  test("SPARK-11246 cache parquet table") {
316    sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")
317
318    cacheTable("cachedTable")
319    val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan
320    assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 1)
321
322    sql("DROP TABLE cachedTable")
323  }
324
325  test("cache a table using CatalogFileIndex") {
326    withTable("test") {
327      sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
328      val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
329      val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
330
331      val dataSchema = StructType(tableMeta.schema.filterNot { f =>
332        tableMeta.partitionColumnNames.contains(f.name)
333      })
334      val relation = HadoopFsRelation(
335        location = catalogFileIndex,
336        partitionSchema = tableMeta.partitionSchema,
337        dataSchema = dataSchema,
338        bucketSpec = None,
339        fileFormat = new ParquetFileFormat(),
340        options = Map.empty)(sparkSession = spark)
341
342      val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
343      spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
344
345      assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
346
347      val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0)
348      val sameRelation = HadoopFsRelation(
349        location = sameCatalog,
350        partitionSchema = tableMeta.partitionSchema,
351        dataSchema = dataSchema,
352        bucketSpec = None,
353        fileFormat = new ParquetFileFormat(),
354        options = Map.empty)(sparkSession = spark)
355      val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
356
357      assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
358    }
359  }
360}
361