1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import org.apache.spark.sql.{Dataset, Row, SparkSession} 21import org.apache.spark.sql.catalyst.TableIdentifier 22import org.apache.spark.sql.catalyst.analysis.NoSuchTableException 23import org.apache.spark.sql.catalyst.plans.QueryPlan 24import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 25 26case class CacheTableCommand( 27 tableIdent: TableIdentifier, 28 plan: Option[LogicalPlan], 29 isLazy: Boolean) extends RunnableCommand { 30 require(plan.isEmpty || tableIdent.database.isEmpty, 31 "Database name is not allowed in CACHE TABLE AS SELECT") 32 33 override protected def innerChildren: Seq[QueryPlan[_]] = { 34 plan.toSeq 35 } 36 37 override def run(sparkSession: SparkSession): Seq[Row] = { 38 plan.foreach { logicalPlan => 39 Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) 40 } 41 sparkSession.catalog.cacheTable(tableIdent.quotedString) 42 43 if (!isLazy) { 44 // Performs eager caching 45 sparkSession.table(tableIdent).count() 46 } 47 48 Seq.empty[Row] 49 } 50} 51 52 53case class UncacheTableCommand( 54 tableIdent: TableIdentifier, 55 ifExists: Boolean) extends RunnableCommand { 56 57 override def run(sparkSession: SparkSession): Seq[Row] = { 58 val tableId = tableIdent.quotedString 59 try { 60 sparkSession.catalog.uncacheTable(tableId) 61 } catch { 62 case _: NoSuchTableException if ifExists => // don't throw 63 } 64 Seq.empty[Row] 65 } 66} 67 68/** 69 * Clear all cached data from the in-memory cache. 70 */ 71case object ClearCacheCommand extends RunnableCommand { 72 73 override def run(sparkSession: SparkSession): Seq[Row] = { 74 sparkSession.catalog.clearCache() 75 Seq.empty[Row] 76 } 77} 78