1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.datasources 19 20import java.io.IOException 21 22import org.apache.hadoop.fs.{FileSystem, Path} 23 24import org.apache.spark.internal.io.FileCommitProtocol 25import org.apache.spark.sql._ 26import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} 27import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec 28import org.apache.spark.sql.catalyst.expressions.Attribute 29import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 30import org.apache.spark.sql.execution.command.RunnableCommand 31 32/** 33 * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. 34 * Writing to dynamic partitions is also supported. 35 * 36 * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of 37 * partition overwrites: when the spec is empty, all partitions are 38 * overwritten. When it covers a prefix of the partition keys, only 39 * partitions matching the prefix are overwritten. 40 * @param customPartitionLocations mapping of partition specs to their custom locations. The 41 * caller should guarantee that exactly those table partitions 42 * falling under the specified static partition keys are contained 43 * in this map, and that no other partitions are. 44 */ 45case class InsertIntoHadoopFsRelationCommand( 46 outputPath: Path, 47 staticPartitionKeys: TablePartitionSpec, 48 customPartitionLocations: Map[TablePartitionSpec, String], 49 partitionColumns: Seq[Attribute], 50 bucketSpec: Option[BucketSpec], 51 fileFormat: FileFormat, 52 refreshFunction: Seq[TablePartitionSpec] => Unit, 53 options: Map[String, String], 54 @transient query: LogicalPlan, 55 mode: SaveMode, 56 catalogTable: Option[CatalogTable]) 57 extends RunnableCommand { 58 59 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName 60 61 override protected def innerChildren: Seq[LogicalPlan] = query :: Nil 62 63 override def run(sparkSession: SparkSession): Seq[Row] = { 64 // Most formats don't do well with duplicate columns, so lets not allow that 65 if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) { 66 val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect { 67 case (x, ys) if ys.length > 1 => "\"" + x + "\"" 68 }.mkString(", ") 69 throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + 70 s"cannot save to file.") 71 } 72 73 val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) 74 val fs = outputPath.getFileSystem(hadoopConf) 75 val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) 76 77 val pathExists = fs.exists(qualifiedOutputPath) 78 val doInsertion = (mode, pathExists) match { 79 case (SaveMode.ErrorIfExists, true) => 80 throw new AnalysisException(s"path $qualifiedOutputPath already exists.") 81 case (SaveMode.Overwrite, true) => 82 deleteMatchingPartitions(fs, qualifiedOutputPath) 83 true 84 case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => 85 true 86 case (SaveMode.Ignore, exists) => 87 !exists 88 case (s, exists) => 89 throw new IllegalStateException(s"unsupported save mode $s ($exists)") 90 } 91 // If we are appending data to an existing dir. 92 val isAppend = pathExists && (mode == SaveMode.Append) 93 94 if (doInsertion) { 95 val committer = FileCommitProtocol.instantiate( 96 sparkSession.sessionState.conf.fileCommitProtocolClass, 97 jobId = java.util.UUID.randomUUID().toString, 98 outputPath = outputPath.toString, 99 isAppend = isAppend) 100 101 FileFormatWriter.write( 102 sparkSession = sparkSession, 103 queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, 104 fileFormat = fileFormat, 105 committer = committer, 106 outputSpec = FileFormatWriter.OutputSpec( 107 qualifiedOutputPath.toString, customPartitionLocations), 108 hadoopConf = hadoopConf, 109 partitionColumns = partitionColumns, 110 bucketSpec = bucketSpec, 111 refreshFunction = refreshFunction, 112 options = options) 113 } else { 114 logInfo("Skipping insertion into a relation that already exists.") 115 } 116 117 Seq.empty[Row] 118 } 119 120 /** 121 * Deletes all partition files that match the specified static prefix. Partitions with custom 122 * locations are also cleared based on the custom locations map given to this class. 123 */ 124 private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = { 125 val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) { 126 "/" + partitionColumns.flatMap { p => 127 staticPartitionKeys.get(p.name) match { 128 case Some(value) => 129 Some(escapePathName(p.name) + "=" + escapePathName(value)) 130 case None => 131 None 132 } 133 }.mkString("/") 134 } else { 135 "" 136 } 137 // first clear the path determined by the static partition keys (e.g. /table/foo=1) 138 val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) 139 if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) { 140 throw new IOException(s"Unable to clear output " + 141 s"directory $staticPrefixPath prior to writing to it") 142 } 143 // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) 144 for ((spec, customLoc) <- customPartitionLocations) { 145 assert( 146 (staticPartitionKeys.toSet -- spec).isEmpty, 147 "Custom partition location did not match static partitioning keys") 148 val path = new Path(customLoc) 149 if (fs.exists(path) && !fs.delete(path, true)) { 150 throw new IOException(s"Unable to clear partition " + 151 s"directory $path prior to writing to it") 152 } 153 } 154 } 155} 156