1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.hive.execution 19 20import java.io.IOException 21import java.net.URI 22import java.text.SimpleDateFormat 23import java.util.{Date, Locale, Random} 24 25import scala.util.control.NonFatal 26 27import org.apache.hadoop.fs.{FileSystem, Path} 28import org.apache.hadoop.hive.common.FileUtils 29import org.apache.hadoop.hive.ql.exec.TaskRunner 30import org.apache.hadoop.hive.ql.ErrorMsg 31import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} 32 33import org.apache.spark.rdd.RDD 34import org.apache.spark.sql.AnalysisException 35import org.apache.spark.sql.catalyst.InternalRow 36import org.apache.spark.sql.catalyst.expressions.Attribute 37import org.apache.spark.sql.catalyst.plans.physical.Partitioning 38import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} 39import org.apache.spark.sql.hive._ 40import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} 41import org.apache.spark.SparkException 42import org.apache.spark.util.SerializableJobConf 43 44 45/** 46 * Command for writing data out to a Hive table. 47 * 48 * This class is mostly a mess, for legacy reasons (since it evolved in organic ways and had to 49 * follow Hive's internal implementations closely, which itself was a mess too). Please don't 50 * blame Reynold for this! He was just moving code around! 51 * 52 * In the future we should converge the write path for Hive with the normal data source write path, 53 * as defined in [[org.apache.spark.sql.execution.datasources.FileFormatWriter]]. 54 * 55 * @param table the logical plan representing the table. In the future this should be a 56 * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables 57 * and data source tables. 58 * @param partition a map from the partition key to the partition value (optional). If the partition 59 * value is optional, dynamic partition insert will be performed. 60 * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have 61 * 62 * {{{ 63 * Map('a' -> Some('1'), 'b' -> Some('2')) 64 * }}} 65 * 66 * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` 67 * would have 68 * 69 * {{{ 70 * Map('a' -> Some('1'), 'b' -> None) 71 * }}}. 72 * @param child the logical plan representing data to write to. 73 * @param overwrite overwrite existing table or partitions. 74 * @param ifNotExists If true, only write if the table or partition does not exist. 75 */ 76case class InsertIntoHiveTable( 77 table: MetastoreRelation, 78 partition: Map[String, Option[String]], 79 child: SparkPlan, 80 overwrite: Boolean, 81 ifNotExists: Boolean) extends UnaryExecNode { 82 83 @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] 84 @transient private val externalCatalog = sqlContext.sharedState.externalCatalog 85 86 def output: Seq[Attribute] = Seq.empty 87 88 val hadoopConf = sessionState.newHadoopConf() 89 var createdTempDir: Option[Path] = None 90 val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") 91 val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") 92 93 private def executionId: String = { 94 val rand: Random = new Random 95 val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) 96 "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) 97 } 98 99 private def getStagingDir(inputPath: Path): Path = { 100 val inputPathUri: URI = inputPath.toUri 101 val inputPathName: String = inputPathUri.getPath 102 val fs: FileSystem = inputPath.getFileSystem(hadoopConf) 103 val stagingPathName: String = 104 if (inputPathName.indexOf(stagingDir) == -1) { 105 new Path(inputPathName, stagingDir).toString 106 } else { 107 inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) 108 } 109 val dir: Path = 110 fs.makeQualified( 111 new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) 112 logDebug("Created staging dir = " + dir + " for path = " + inputPath) 113 try { 114 if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { 115 throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") 116 } 117 createdTempDir = Some(dir) 118 fs.deleteOnExit(dir) 119 } catch { 120 case e: IOException => 121 throw new RuntimeException( 122 "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) 123 } 124 return dir 125 } 126 127 private def getExternalScratchDir(extURI: URI): Path = { 128 getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath)) 129 } 130 131 def getExternalTmpPath(path: Path): Path = { 132 import org.apache.spark.sql.hive.client.hive._ 133 134 val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version 135 // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under 136 // a common scratch directory. After the writing is finished, Hive will simply empty the table 137 // directory and move the staging directory to it. 138 // After Hive 1.1, Hive will create the staging directory under the table directory, and when 139 // moving staging directory to table directory, Hive will still empty the table directory, but 140 // will exclude the staging directory there. 141 // We have to follow the Hive behavior here, to avoid troubles. For example, if we create 142 // staging directory under the table director for Hive prior to 1.1, the staging directory will 143 // be removed by Hive when Hive is trying to empty the table directory. 144 if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) { 145 oldVersionExternalTempPath(path) 146 } else if (hiveVersion == v1_1 || hiveVersion == v1_2) { 147 newVersionExternalTempPath(path) 148 } else { 149 throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) 150 } 151 } 152 153 // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 154 def oldVersionExternalTempPath(path: Path): Path = { 155 val extURI: URI = path.toUri 156 val scratchPath = new Path(scratchDir, executionId) 157 var dirPath = new Path( 158 extURI.getScheme, 159 extURI.getAuthority, 160 scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) 161 162 try { 163 val fs: FileSystem = dirPath.getFileSystem(hadoopConf) 164 dirPath = new Path(fs.makeQualified(dirPath).toString()) 165 166 if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { 167 throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) 168 } 169 createdTempDir = Some(dirPath) 170 fs.deleteOnExit(dirPath) 171 } catch { 172 case e: IOException => 173 throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) 174 } 175 dirPath 176 } 177 178 // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 179 def newVersionExternalTempPath(path: Path): Path = { 180 val extURI: URI = path.toUri 181 if (extURI.getScheme == "viewfs") { 182 getExtTmpPathRelTo(path.getParent) 183 } else { 184 new Path(getExternalScratchDir(extURI), "-ext-10000") 185 } 186 } 187 188 def getExtTmpPathRelTo(path: Path): Path = { 189 new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000 190 } 191 192 private def saveAsHiveFile( 193 rdd: RDD[InternalRow], 194 valueClass: Class[_], 195 fileSinkConf: FileSinkDesc, 196 conf: SerializableJobConf, 197 writerContainer: SparkHiveWriterContainer): Unit = { 198 assert(valueClass != null, "Output value class not set") 199 conf.value.setOutputValueClass(valueClass) 200 201 val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName 202 assert(outputFileFormatClassName != null, "Output format class not set") 203 conf.value.set("mapred.output.format.class", outputFileFormatClassName) 204 205 FileOutputFormat.setOutputPath( 206 conf.value, 207 SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value)) 208 log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) 209 writerContainer.driverSideSetup() 210 sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) 211 writerContainer.commitJob() 212 } 213 214 /** 215 * Inserts all the rows in the table into Hive. Row objects are properly serialized with the 216 * `org.apache.hadoop.hive.serde2.SerDe` and the 217 * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. 218 * 219 * Note: this is run once and then kept to avoid double insertions. 220 */ 221 protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { 222 // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer 223 // instances within the closure, since Serializer is not serializable while TableDesc is. 224 val tableDesc = table.tableDesc 225 val tableLocation = table.hiveQlTable.getDataLocation 226 val tmpLocation = getExternalTmpPath(tableLocation) 227 val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) 228 val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean 229 230 if (isCompressed) { 231 // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", 232 // and "mapred.output.compression.type" have no impact on ORC because it uses table properties 233 // to store compression information. 234 hadoopConf.set("mapred.output.compress", "true") 235 fileSinkConf.setCompressed(true) 236 fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) 237 fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) 238 } 239 240 val numDynamicPartitions = partition.values.count(_.isEmpty) 241 val numStaticPartitions = partition.values.count(_.nonEmpty) 242 val partitionSpec = partition.map { 243 case (key, Some(value)) => key -> value 244 case (key, None) => key -> "" 245 } 246 247 // All partition column names in the format of "<column name 1>/<column name 2>/..." 248 val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") 249 val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) 250 251 // By this time, the partition map must match the table's partition columns 252 if (partitionColumnNames.toSet != partition.keySet) { 253 throw new SparkException( 254 s"""Requested partitioning does not match the ${table.tableName} table: 255 |Requested partitions: ${partition.keys.mkString(",")} 256 |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) 257 } 258 259 // Validate partition spec if there exist any dynamic partitions 260 if (numDynamicPartitions > 0) { 261 // Report error if dynamic partitioning is not enabled 262 if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { 263 throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) 264 } 265 266 // Report error if dynamic partition strict mode is on but no static partition is found 267 if (numStaticPartitions == 0 && 268 hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { 269 throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) 270 } 271 272 // Report error if any static partition appears after a dynamic partition 273 val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) 274 if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { 275 throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) 276 } 277 } 278 279 val jobConf = new JobConf(hadoopConf) 280 val jobConfSer = new SerializableJobConf(jobConf) 281 282 // When speculation is on and output committer class name contains "Direct", we should warn 283 // users that they may loss data if they are using a direct output committer. 284 val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) 285 val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") 286 if (speculationEnabled && outputCommitterClass.contains("Direct")) { 287 val warningMessage = 288 s"$outputCommitterClass may be an output committer that writes data directly to " + 289 "the final location. Because speculation is enabled, this output committer may " + 290 "cause data loss (see the case in SPARK-10063). If possible, please use an output " + 291 "committer that does not have this behavior (e.g. FileOutputCommitter)." 292 logWarning(warningMessage) 293 } 294 295 val writerContainer = if (numDynamicPartitions > 0) { 296 val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) 297 new SparkHiveDynamicPartitionWriterContainer( 298 jobConf, 299 fileSinkConf, 300 dynamicPartColNames, 301 child.output) 302 } else { 303 new SparkHiveWriterContainer( 304 jobConf, 305 fileSinkConf, 306 child.output) 307 } 308 309 @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass 310 saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) 311 312 val outputPath = FileOutputFormat.getOutputPath(jobConf) 313 // TODO: Correctly set holdDDLTime. 314 // In most of the time, we should have holdDDLTime = false. 315 // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. 316 val holdDDLTime = false 317 if (partition.nonEmpty) { 318 if (numDynamicPartitions > 0) { 319 externalCatalog.loadDynamicPartitions( 320 db = table.catalogTable.database, 321 table = table.catalogTable.identifier.table, 322 outputPath.toString, 323 partitionSpec, 324 overwrite, 325 numDynamicPartitions, 326 holdDDLTime = holdDDLTime) 327 } else { 328 // scalastyle:off 329 // ifNotExists is only valid with static partition, refer to 330 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries 331 // scalastyle:on 332 val oldPart = 333 externalCatalog.getPartitionOption( 334 table.catalogTable.database, 335 table.catalogTable.identifier.table, 336 partitionSpec) 337 338 var doHiveOverwrite = overwrite 339 340 if (oldPart.isEmpty || !ifNotExists) { 341 // SPARK-18107: Insert overwrite runs much slower than hive-client. 342 // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive 343 // version and we may not want to catch up new Hive version every time. We delete the 344 // Hive partition first and then load data file into the Hive partition. 345 if (oldPart.nonEmpty && overwrite) { 346 oldPart.get.storage.locationUri.foreach { uri => 347 val partitionPath = new Path(uri) 348 val fs = partitionPath.getFileSystem(hadoopConf) 349 if (fs.exists(partitionPath)) { 350 if (!fs.delete(partitionPath, true)) { 351 throw new RuntimeException( 352 "Cannot remove partition directory '" + partitionPath.toString) 353 } 354 // Don't let Hive do overwrite operation since it is slower. 355 doHiveOverwrite = false 356 } 357 } 358 } 359 360 // inheritTableSpecs is set to true. It should be set to false for an IMPORT query 361 // which is currently considered as a Hive native command. 362 val inheritTableSpecs = true 363 externalCatalog.loadPartition( 364 table.catalogTable.database, 365 table.catalogTable.identifier.table, 366 outputPath.toString, 367 partitionSpec, 368 isOverwrite = doHiveOverwrite, 369 holdDDLTime = holdDDLTime, 370 inheritTableSpecs = inheritTableSpecs) 371 } 372 } 373 } else { 374 externalCatalog.loadTable( 375 table.catalogTable.database, 376 table.catalogTable.identifier.table, 377 outputPath.toString, // TODO: URI 378 overwrite, 379 holdDDLTime) 380 } 381 382 // Attempt to delete the staging directory and the inclusive files. If failed, the files are 383 // expected to be dropped at the normal termination of VM since deleteOnExit is used. 384 try { 385 createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } 386 } catch { 387 case NonFatal(e) => 388 logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) 389 } 390 391 // un-cache this table. 392 sqlContext.sparkSession.catalog.uncacheTable(table.catalogTable.identifier.quotedString) 393 sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) 394 395 // It would be nice to just return the childRdd unchanged so insert operations could be chained, 396 // however for now we return an empty list to simplify compatibility checks with hive, which 397 // does not return anything for insert operations. 398 // TODO: implement hive compatibility as rules. 399 Seq.empty[InternalRow] 400 } 401 402 override def outputPartitioning: Partitioning = child.outputPartitioning 403 404 override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray 405 406 protected override def doExecute(): RDD[InternalRow] = { 407 sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) 408 } 409} 410