1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import org.apache.spark.sql.{Dataset, Row, SparkSession}
21import org.apache.spark.sql.catalyst.TableIdentifier
22import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
23import org.apache.spark.sql.catalyst.plans.QueryPlan
24import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
25
26case class CacheTableCommand(
27    tableIdent: TableIdentifier,
28    plan: Option[LogicalPlan],
29    isLazy: Boolean) extends RunnableCommand {
30  require(plan.isEmpty || tableIdent.database.isEmpty,
31    "Database name is not allowed in CACHE TABLE AS SELECT")
32
33  override protected def innerChildren: Seq[QueryPlan[_]] = {
34    plan.toSeq
35  }
36
37  override def run(sparkSession: SparkSession): Seq[Row] = {
38    plan.foreach { logicalPlan =>
39      Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
40    }
41    sparkSession.catalog.cacheTable(tableIdent.quotedString)
42
43    if (!isLazy) {
44      // Performs eager caching
45      sparkSession.table(tableIdent).count()
46    }
47
48    Seq.empty[Row]
49  }
50}
51
52
53case class UncacheTableCommand(
54    tableIdent: TableIdentifier,
55    ifExists: Boolean) extends RunnableCommand {
56
57  override def run(sparkSession: SparkSession): Seq[Row] = {
58    val tableId = tableIdent.quotedString
59    try {
60      sparkSession.catalog.uncacheTable(tableId)
61    } catch {
62      case _: NoSuchTableException if ifExists => // don't throw
63    }
64    Seq.empty[Row]
65  }
66}
67
68/**
69 * Clear all cached data from the in-memory cache.
70 */
71case object ClearCacheCommand extends RunnableCommand {
72
73  override def run(sparkSession: SparkSession): Seq[Row] = {
74    sparkSession.catalog.clearCache()
75    Seq.empty[Row]
76  }
77}
78