1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.datasources
19
20import java.io.IOException
21
22import org.apache.hadoop.fs.{FileSystem, Path}
23
24import org.apache.spark.internal.io.FileCommitProtocol
25import org.apache.spark.sql._
26import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
27import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
28import org.apache.spark.sql.catalyst.expressions.Attribute
29import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
30import org.apache.spark.sql.execution.command.RunnableCommand
31
32/**
33 * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
34 * Writing to dynamic partitions is also supported.
35 *
36 * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of
37 *                            partition overwrites: when the spec is empty, all partitions are
38 *                            overwritten. When it covers a prefix of the partition keys, only
39 *                            partitions matching the prefix are overwritten.
40 * @param customPartitionLocations mapping of partition specs to their custom locations. The
41 *                                 caller should guarantee that exactly those table partitions
42 *                                 falling under the specified static partition keys are contained
43 *                                 in this map, and that no other partitions are.
44 */
45case class InsertIntoHadoopFsRelationCommand(
46    outputPath: Path,
47    staticPartitionKeys: TablePartitionSpec,
48    customPartitionLocations: Map[TablePartitionSpec, String],
49    partitionColumns: Seq[Attribute],
50    bucketSpec: Option[BucketSpec],
51    fileFormat: FileFormat,
52    refreshFunction: Seq[TablePartitionSpec] => Unit,
53    options: Map[String, String],
54    @transient query: LogicalPlan,
55    mode: SaveMode,
56    catalogTable: Option[CatalogTable])
57  extends RunnableCommand {
58
59  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
60
61  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
62
63  override def run(sparkSession: SparkSession): Seq[Row] = {
64    // Most formats don't do well with duplicate columns, so lets not allow that
65    if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) {
66      val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
67        case (x, ys) if ys.length > 1 => "\"" + x + "\""
68      }.mkString(", ")
69      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
70          s"cannot save to file.")
71    }
72
73    val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
74    val fs = outputPath.getFileSystem(hadoopConf)
75    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
76
77    val pathExists = fs.exists(qualifiedOutputPath)
78    val doInsertion = (mode, pathExists) match {
79      case (SaveMode.ErrorIfExists, true) =>
80        throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
81      case (SaveMode.Overwrite, true) =>
82        deleteMatchingPartitions(fs, qualifiedOutputPath)
83        true
84      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
85        true
86      case (SaveMode.Ignore, exists) =>
87        !exists
88      case (s, exists) =>
89        throw new IllegalStateException(s"unsupported save mode $s ($exists)")
90    }
91    // If we are appending data to an existing dir.
92    val isAppend = pathExists && (mode == SaveMode.Append)
93
94    if (doInsertion) {
95      val committer = FileCommitProtocol.instantiate(
96        sparkSession.sessionState.conf.fileCommitProtocolClass,
97        jobId = java.util.UUID.randomUUID().toString,
98        outputPath = outputPath.toString,
99        isAppend = isAppend)
100
101      FileFormatWriter.write(
102        sparkSession = sparkSession,
103        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
104        fileFormat = fileFormat,
105        committer = committer,
106        outputSpec = FileFormatWriter.OutputSpec(
107          qualifiedOutputPath.toString, customPartitionLocations),
108        hadoopConf = hadoopConf,
109        partitionColumns = partitionColumns,
110        bucketSpec = bucketSpec,
111        refreshFunction = refreshFunction,
112        options = options)
113    } else {
114      logInfo("Skipping insertion into a relation that already exists.")
115    }
116
117    Seq.empty[Row]
118  }
119
120  /**
121   * Deletes all partition files that match the specified static prefix. Partitions with custom
122   * locations are also cleared based on the custom locations map given to this class.
123   */
124  private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
125    val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) {
126      "/" + partitionColumns.flatMap { p =>
127        staticPartitionKeys.get(p.name) match {
128          case Some(value) =>
129            Some(escapePathName(p.name) + "=" + escapePathName(value))
130          case None =>
131            None
132        }
133      }.mkString("/")
134    } else {
135      ""
136    }
137    // first clear the path determined by the static partition keys (e.g. /table/foo=1)
138    val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
139    if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) {
140      throw new IOException(s"Unable to clear output " +
141        s"directory $staticPrefixPath prior to writing to it")
142    }
143    // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
144    for ((spec, customLoc) <- customPartitionLocations) {
145      assert(
146        (staticPartitionKeys.toSet -- spec).isEmpty,
147        "Custom partition location did not match static partitioning keys")
148      val path = new Path(customLoc)
149      if (fs.exists(path) && !fs.delete(path, true)) {
150        throw new IOException(s"Unable to clear partition " +
151          s"directory $path prior to writing to it")
152      }
153    }
154  }
155}
156