1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.hive.execution
19
20import java.io.IOException
21import java.net.URI
22import java.text.SimpleDateFormat
23import java.util.{Date, Locale, Random}
24
25import scala.util.control.NonFatal
26
27import org.apache.hadoop.fs.{FileSystem, Path}
28import org.apache.hadoop.hive.common.FileUtils
29import org.apache.hadoop.hive.ql.exec.TaskRunner
30import org.apache.hadoop.hive.ql.ErrorMsg
31import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
32
33import org.apache.spark.rdd.RDD
34import org.apache.spark.sql.AnalysisException
35import org.apache.spark.sql.catalyst.InternalRow
36import org.apache.spark.sql.catalyst.expressions.Attribute
37import org.apache.spark.sql.catalyst.plans.physical.Partitioning
38import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
39import org.apache.spark.sql.hive._
40import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
41import org.apache.spark.SparkException
42import org.apache.spark.util.SerializableJobConf
43
44
45/**
46 * Command for writing data out to a Hive table.
47 *
48 * This class is mostly a mess, for legacy reasons (since it evolved in organic ways and had to
49 * follow Hive's internal implementations closely, which itself was a mess too). Please don't
50 * blame Reynold for this! He was just moving code around!
51 *
52 * In the future we should converge the write path for Hive with the normal data source write path,
53 * as defined in [[org.apache.spark.sql.execution.datasources.FileFormatWriter]].
54 *
55 * @param table the logical plan representing the table. In the future this should be a
56 *              [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
57 *              and data source tables.
58 * @param partition a map from the partition key to the partition value (optional). If the partition
59 *                  value is optional, dynamic partition insert will be performed.
60 *                  As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
61 *
62 *                  {{{
63 *                  Map('a' -> Some('1'), 'b' -> Some('2'))
64 *                  }}}
65 *
66 *                  and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
67 *                  would have
68 *
69 *                  {{{
70 *                  Map('a' -> Some('1'), 'b' -> None)
71 *                  }}}.
72 * @param child the logical plan representing data to write to.
73 * @param overwrite overwrite existing table or partitions.
74 * @param ifNotExists If true, only write if the table or partition does not exist.
75 */
76case class InsertIntoHiveTable(
77    table: MetastoreRelation,
78    partition: Map[String, Option[String]],
79    child: SparkPlan,
80    overwrite: Boolean,
81    ifNotExists: Boolean) extends UnaryExecNode {
82
83  @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
84  @transient private val externalCatalog = sqlContext.sharedState.externalCatalog
85
86  def output: Seq[Attribute] = Seq.empty
87
88  val hadoopConf = sessionState.newHadoopConf()
89  var createdTempDir: Option[Path] = None
90  val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
91  val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
92
93  private def executionId: String = {
94    val rand: Random = new Random
95    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
96    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
97  }
98
99  private def getStagingDir(inputPath: Path): Path = {
100    val inputPathUri: URI = inputPath.toUri
101    val inputPathName: String = inputPathUri.getPath
102    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
103    val stagingPathName: String =
104      if (inputPathName.indexOf(stagingDir) == -1) {
105        new Path(inputPathName, stagingDir).toString
106      } else {
107        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
108      }
109    val dir: Path =
110      fs.makeQualified(
111        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
112    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
113    try {
114      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
115        throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
116      }
117      createdTempDir = Some(dir)
118      fs.deleteOnExit(dir)
119    } catch {
120      case e: IOException =>
121        throw new RuntimeException(
122          "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
123    }
124    return dir
125  }
126
127  private def getExternalScratchDir(extURI: URI): Path = {
128    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
129  }
130
131  def getExternalTmpPath(path: Path): Path = {
132    import org.apache.spark.sql.hive.client.hive._
133
134    val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
135    // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
136    // a common scratch directory. After the writing is finished, Hive will simply empty the table
137    // directory and move the staging directory to it.
138    // After Hive 1.1, Hive will create the staging directory under the table directory, and when
139    // moving staging directory to table directory, Hive will still empty the table directory, but
140    // will exclude the staging directory there.
141    // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
142    // staging directory under the table director for Hive prior to 1.1, the staging directory will
143    // be removed by Hive when Hive is trying to empty the table directory.
144    if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
145      oldVersionExternalTempPath(path)
146    } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
147      newVersionExternalTempPath(path)
148    } else {
149      throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
150    }
151  }
152
153  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
154  def oldVersionExternalTempPath(path: Path): Path = {
155    val extURI: URI = path.toUri
156    val scratchPath = new Path(scratchDir, executionId)
157    var dirPath = new Path(
158      extURI.getScheme,
159      extURI.getAuthority,
160      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
161
162    try {
163      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
164      dirPath = new Path(fs.makeQualified(dirPath).toString())
165
166      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
167        throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
168      }
169      createdTempDir = Some(dirPath)
170      fs.deleteOnExit(dirPath)
171    } catch {
172      case e: IOException =>
173        throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
174    }
175    dirPath
176  }
177
178  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
179  def newVersionExternalTempPath(path: Path): Path = {
180    val extURI: URI = path.toUri
181    if (extURI.getScheme == "viewfs") {
182      getExtTmpPathRelTo(path.getParent)
183    } else {
184      new Path(getExternalScratchDir(extURI), "-ext-10000")
185    }
186  }
187
188  def getExtTmpPathRelTo(path: Path): Path = {
189    new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
190  }
191
192  private def saveAsHiveFile(
193      rdd: RDD[InternalRow],
194      valueClass: Class[_],
195      fileSinkConf: FileSinkDesc,
196      conf: SerializableJobConf,
197      writerContainer: SparkHiveWriterContainer): Unit = {
198    assert(valueClass != null, "Output value class not set")
199    conf.value.setOutputValueClass(valueClass)
200
201    val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
202    assert(outputFileFormatClassName != null, "Output format class not set")
203    conf.value.set("mapred.output.format.class", outputFileFormatClassName)
204
205    FileOutputFormat.setOutputPath(
206      conf.value,
207      SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
208    log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
209    writerContainer.driverSideSetup()
210    sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
211    writerContainer.commitJob()
212  }
213
214  /**
215   * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
216   * `org.apache.hadoop.hive.serde2.SerDe` and the
217   * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
218   *
219   * Note: this is run once and then kept to avoid double insertions.
220   */
221  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
222    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
223    // instances within the closure, since Serializer is not serializable while TableDesc is.
224    val tableDesc = table.tableDesc
225    val tableLocation = table.hiveQlTable.getDataLocation
226    val tmpLocation = getExternalTmpPath(tableLocation)
227    val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
228    val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
229
230    if (isCompressed) {
231      // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
232      // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
233      // to store compression information.
234      hadoopConf.set("mapred.output.compress", "true")
235      fileSinkConf.setCompressed(true)
236      fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
237      fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
238    }
239
240    val numDynamicPartitions = partition.values.count(_.isEmpty)
241    val numStaticPartitions = partition.values.count(_.nonEmpty)
242    val partitionSpec = partition.map {
243      case (key, Some(value)) => key -> value
244      case (key, None) => key -> ""
245    }
246
247    // All partition column names in the format of "<column name 1>/<column name 2>/..."
248    val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
249    val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
250
251    // By this time, the partition map must match the table's partition columns
252    if (partitionColumnNames.toSet != partition.keySet) {
253      throw new SparkException(
254        s"""Requested partitioning does not match the ${table.tableName} table:
255           |Requested partitions: ${partition.keys.mkString(",")}
256           |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin)
257    }
258
259    // Validate partition spec if there exist any dynamic partitions
260    if (numDynamicPartitions > 0) {
261      // Report error if dynamic partitioning is not enabled
262      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
263        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
264      }
265
266      // Report error if dynamic partition strict mode is on but no static partition is found
267      if (numStaticPartitions == 0 &&
268        hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
269        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
270      }
271
272      // Report error if any static partition appears after a dynamic partition
273      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
274      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
275        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
276      }
277    }
278
279    val jobConf = new JobConf(hadoopConf)
280    val jobConfSer = new SerializableJobConf(jobConf)
281
282    // When speculation is on and output committer class name contains "Direct", we should warn
283    // users that they may loss data if they are using a direct output committer.
284    val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
285    val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
286    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
287      val warningMessage =
288        s"$outputCommitterClass may be an output committer that writes data directly to " +
289          "the final location. Because speculation is enabled, this output committer may " +
290          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
291          "committer that does not have this behavior (e.g. FileOutputCommitter)."
292      logWarning(warningMessage)
293    }
294
295    val writerContainer = if (numDynamicPartitions > 0) {
296      val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
297      new SparkHiveDynamicPartitionWriterContainer(
298        jobConf,
299        fileSinkConf,
300        dynamicPartColNames,
301        child.output)
302    } else {
303      new SparkHiveWriterContainer(
304        jobConf,
305        fileSinkConf,
306        child.output)
307    }
308
309    @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
310    saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
311
312    val outputPath = FileOutputFormat.getOutputPath(jobConf)
313    // TODO: Correctly set holdDDLTime.
314    // In most of the time, we should have holdDDLTime = false.
315    // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
316    val holdDDLTime = false
317    if (partition.nonEmpty) {
318      if (numDynamicPartitions > 0) {
319        externalCatalog.loadDynamicPartitions(
320          db = table.catalogTable.database,
321          table = table.catalogTable.identifier.table,
322          outputPath.toString,
323          partitionSpec,
324          overwrite,
325          numDynamicPartitions,
326          holdDDLTime = holdDDLTime)
327      } else {
328        // scalastyle:off
329        // ifNotExists is only valid with static partition, refer to
330        // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
331        // scalastyle:on
332        val oldPart =
333          externalCatalog.getPartitionOption(
334            table.catalogTable.database,
335            table.catalogTable.identifier.table,
336            partitionSpec)
337
338        var doHiveOverwrite = overwrite
339
340        if (oldPart.isEmpty || !ifNotExists) {
341          // SPARK-18107: Insert overwrite runs much slower than hive-client.
342          // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
343          // version and we may not want to catch up new Hive version every time. We delete the
344          // Hive partition first and then load data file into the Hive partition.
345          if (oldPart.nonEmpty && overwrite) {
346            oldPart.get.storage.locationUri.foreach { uri =>
347              val partitionPath = new Path(uri)
348              val fs = partitionPath.getFileSystem(hadoopConf)
349              if (fs.exists(partitionPath)) {
350                if (!fs.delete(partitionPath, true)) {
351                  throw new RuntimeException(
352                    "Cannot remove partition directory '" + partitionPath.toString)
353                }
354                // Don't let Hive do overwrite operation since it is slower.
355                doHiveOverwrite = false
356              }
357            }
358          }
359
360          // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
361          // which is currently considered as a Hive native command.
362          val inheritTableSpecs = true
363          externalCatalog.loadPartition(
364            table.catalogTable.database,
365            table.catalogTable.identifier.table,
366            outputPath.toString,
367            partitionSpec,
368            isOverwrite = doHiveOverwrite,
369            holdDDLTime = holdDDLTime,
370            inheritTableSpecs = inheritTableSpecs)
371        }
372      }
373    } else {
374      externalCatalog.loadTable(
375        table.catalogTable.database,
376        table.catalogTable.identifier.table,
377        outputPath.toString, // TODO: URI
378        overwrite,
379        holdDDLTime)
380    }
381
382    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
383    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
384    try {
385      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
386    } catch {
387      case NonFatal(e) =>
388        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
389    }
390
391    // un-cache this table.
392    sqlContext.sparkSession.catalog.uncacheTable(table.catalogTable.identifier.quotedString)
393    sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
394
395    // It would be nice to just return the childRdd unchanged so insert operations could be chained,
396    // however for now we return an empty list to simplify compatibility checks with hive, which
397    // does not return anything for insert operations.
398    // TODO: implement hive compatibility as rules.
399    Seq.empty[InternalRow]
400  }
401
402  override def outputPartitioning: Partitioning = child.outputPartitioning
403
404  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
405
406  protected override def doExecute(): RDD[InternalRow] = {
407    sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
408  }
409}
410