1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.datasources.parquet
19
20import scala.collection.JavaConverters._
21import scala.collection.mutable
22import scala.reflect.ClassTag
23import scala.reflect.runtime.universe.TypeTag
24
25import org.apache.hadoop.conf.Configuration
26import org.apache.hadoop.fs.{FileSystem, Path}
27import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
28import org.apache.parquet.column.{Encoding, ParquetProperties}
29import org.apache.parquet.example.data.{Group, GroupWriter}
30import org.apache.parquet.example.data.simple.SimpleGroup
31import org.apache.parquet.hadoop._
32import org.apache.parquet.hadoop.api.WriteSupport
33import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
34import org.apache.parquet.hadoop.metadata.CompressionCodecName
35import org.apache.parquet.io.api.RecordConsumer
36import org.apache.parquet.schema.{MessageType, MessageTypeParser}
37
38import org.apache.spark.SparkException
39import org.apache.spark.sql._
40import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
41import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
42import org.apache.spark.sql.catalyst.util.DateTimeUtils
43import org.apache.spark.sql.internal.SQLConf
44import org.apache.spark.sql.test.SharedSQLContext
45import org.apache.spark.sql.types._
46import org.apache.spark.unsafe.types.UTF8String
47
48// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
49// with an empty configuration (it is after all not intended to be used in this way?)
50// and members are private so we need to make our own in order to pass the schema
51// to the writer.
52private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] {
53  var groupWriter: GroupWriter = null
54
55  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
56    groupWriter = new GroupWriter(recordConsumer, schema)
57  }
58
59  override def init(configuration: Configuration): WriteContext = {
60    new WriteContext(schema, new java.util.HashMap[String, String]())
61  }
62
63  override def write(record: Group) {
64    groupWriter.write(record)
65  }
66}
67
68/**
69 * A test suite that tests basic Parquet I/O.
70 */
71class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
72  import testImplicits._
73
74  /**
75   * Writes `data` to a Parquet file, reads it back and check file contents.
76   */
77  protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = {
78    withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
79  }
80
81  test("basic data types (without binary)") {
82    val data = (1 to 4).map { i =>
83      (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
84    }
85    checkParquetFile(data)
86  }
87
88  test("raw binary") {
89    val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
90    withParquetDataFrame(data) { df =>
91      assertResult(data.map(_._1.mkString(",")).sorted) {
92        df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
93      }
94    }
95  }
96
97  test("SPARK-11694 Parquet logical types are not being tested properly") {
98    val parquetSchema = MessageTypeParser.parseMessageType(
99      """message root {
100        |  required int32 a(INT_8);
101        |  required int32 b(INT_16);
102        |  required int32 c(DATE);
103        |  required int32 d(DECIMAL(1,0));
104        |  required int64 e(DECIMAL(10,0));
105        |  required binary f(UTF8);
106        |  required binary g(ENUM);
107        |  required binary h(DECIMAL(32,0));
108        |  required fixed_len_byte_array(32) i(DECIMAL(32,0));
109        |}
110      """.stripMargin)
111
112    val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
113      DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0))
114
115    withTempPath { location =>
116      val path = new Path(location.getCanonicalPath)
117      val conf = spark.sessionState.newHadoopConf()
118      writeMetadata(parquetSchema, path, conf)
119      readParquetFile(path.toString)(df => {
120        val sparkTypes = df.schema.map(_.dataType)
121        assert(sparkTypes === expectedSparkTypes)
122      })
123    }
124  }
125
126  test("string") {
127    val data = (1 to 4).map(i => Tuple1(i.toString))
128    // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
129    // as we store Spark SQL schema in the extra metadata.
130    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "false")(checkParquetFile(data))
131    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data))
132  }
133
134  testStandardAndLegacyModes("fixed-length decimals") {
135    def makeDecimalRDD(decimal: DecimalType): DataFrame = {
136      spark
137        .range(1000)
138        // Parquet doesn't allow column names with spaces, have to add an alias here.
139        // Minus 500 here so that negative decimals are also tested.
140        .select((('id - 500) / 100.0) cast decimal as 'dec)
141        .coalesce(1)
142    }
143
144    val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))
145    for ((precision, scale) <- combinations) {
146      withTempPath { dir =>
147        val data = makeDecimalRDD(DecimalType(precision, scale))
148        data.write.parquet(dir.getCanonicalPath)
149        readParquetFile(dir.getCanonicalPath) { df => {
150          checkAnswer(df, data.collect().toSeq)
151        }}
152      }
153    }
154  }
155
156  test("date type") {
157    def makeDateRDD(): DataFrame =
158      sparkContext
159        .parallelize(0 to 1000)
160        .map(i => Tuple1(DateTimeUtils.toJavaDate(i)))
161        .toDF()
162        .select($"_1")
163
164    withTempPath { dir =>
165      val data = makeDateRDD()
166      data.write.parquet(dir.getCanonicalPath)
167      readParquetFile(dir.getCanonicalPath) { df =>
168        checkAnswer(df, data.collect().toSeq)
169      }
170    }
171  }
172
173  testStandardAndLegacyModes("map") {
174    val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
175    checkParquetFile(data)
176  }
177
178  testStandardAndLegacyModes("array") {
179    val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
180    checkParquetFile(data)
181  }
182
183  testStandardAndLegacyModes("array and double") {
184    val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble)))
185    checkParquetFile(data)
186  }
187
188  testStandardAndLegacyModes("struct") {
189    val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
190    withParquetDataFrame(data) { df =>
191      // Structs are converted to `Row`s
192      checkAnswer(df, data.map { case Tuple1(struct) =>
193        Row(Row(struct.productIterator.toSeq: _*))
194      })
195    }
196  }
197
198  testStandardAndLegacyModes("nested struct with array of array as field") {
199    val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
200    withParquetDataFrame(data) { df =>
201      // Structs are converted to `Row`s
202      checkAnswer(df, data.map { case Tuple1(struct) =>
203        Row(Row(struct.productIterator.toSeq: _*))
204      })
205    }
206  }
207
208  testStandardAndLegacyModes("nested map with struct as value type") {
209    val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
210    withParquetDataFrame(data) { df =>
211      checkAnswer(df, data.map { case Tuple1(m) =>
212        Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
213      })
214    }
215  }
216
217  test("nulls") {
218    val allNulls = (
219      null.asInstanceOf[java.lang.Boolean],
220      null.asInstanceOf[Integer],
221      null.asInstanceOf[java.lang.Long],
222      null.asInstanceOf[java.lang.Float],
223      null.asInstanceOf[java.lang.Double])
224
225    withParquetDataFrame(allNulls :: Nil) { df =>
226      val rows = df.collect()
227      assert(rows.length === 1)
228      assert(rows.head === Row(Seq.fill(5)(null): _*))
229    }
230  }
231
232  test("nones") {
233    val allNones = (
234      None.asInstanceOf[Option[Int]],
235      None.asInstanceOf[Option[Long]],
236      None.asInstanceOf[Option[String]])
237
238    withParquetDataFrame(allNones :: Nil) { df =>
239      val rows = df.collect()
240      assert(rows.length === 1)
241      assert(rows.head === Row(Seq.fill(3)(null): _*))
242    }
243  }
244
245  test("SPARK-10113 Support for unsigned Parquet logical types") {
246    val parquetSchema = MessageTypeParser.parseMessageType(
247      """message root {
248        |  required int32 c(UINT_32);
249        |}
250      """.stripMargin)
251
252    withTempPath { location =>
253      val path = new Path(location.getCanonicalPath)
254      val conf = spark.sessionState.newHadoopConf()
255      writeMetadata(parquetSchema, path, conf)
256      val errorMessage = intercept[Throwable] {
257        spark.read.parquet(path.toString).printSchema()
258      }.toString
259      assert(errorMessage.contains("Parquet type not supported"))
260    }
261  }
262
263  test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded types)") {
264    val parquetSchema = MessageTypeParser.parseMessageType(
265      """message root {
266        |  required binary a(JSON);
267        |  required binary b(BSON);
268        |}
269      """.stripMargin)
270
271    val expectedSparkTypes = Seq(StringType, BinaryType)
272
273    withTempPath { location =>
274      val path = new Path(location.getCanonicalPath)
275      val conf = spark.sessionState.newHadoopConf()
276      writeMetadata(parquetSchema, path, conf)
277      val sparkTypes = spark.read.parquet(path.toString).schema.map(_.dataType)
278      assert(sparkTypes === expectedSparkTypes)
279    }
280  }
281
282  test("compression codec") {
283    val hadoopConf = spark.sessionState.newHadoopConf()
284    def compressionCodecFor(path: String, codecName: String): String = {
285      val codecs = for {
286        footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
287        block <- footer.getParquetMetadata.getBlocks.asScala
288        column <- block.getColumns.asScala
289      } yield column.getCodec.name()
290
291      assert(codecs.distinct === Seq(codecName))
292      codecs.head
293    }
294
295    val data = (0 until 10).map(i => (i, i.toString))
296
297    def checkCompressionCodec(codec: CompressionCodecName): Unit = {
298      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
299        withParquetFile(data) { path =>
300          assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase) {
301            compressionCodecFor(path, codec.name())
302          }
303        }
304      }
305    }
306
307    // Checks default compression codec
308    checkCompressionCodec(
309      CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
310
311    checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
312    checkCompressionCodec(CompressionCodecName.GZIP)
313    checkCompressionCodec(CompressionCodecName.SNAPPY)
314  }
315
316  test("read raw Parquet file") {
317    def makeRawParquetFile(path: Path): Unit = {
318      val schema = MessageTypeParser.parseMessageType(
319        """
320          |message root {
321          |  required boolean _1;
322          |  required int32   _2;
323          |  required int64   _3;
324          |  required float   _4;
325          |  required double  _5;
326          |}
327        """.stripMargin)
328
329      val testWriteSupport = new TestGroupWriteSupport(schema)
330      /**
331       * Provide a builder for constructing a parquet writer - after PARQUET-248 directly
332       * constructing the writer is deprecated and should be done through a builder. The default
333       * builders include Avro - but for raw Parquet writing we must create our own builder.
334       */
335      class ParquetWriterBuilder() extends
336          ParquetWriter.Builder[Group, ParquetWriterBuilder](path) {
337        override def getWriteSupport(conf: Configuration) = testWriteSupport
338
339        override def self() = this
340      }
341
342      val writer = new ParquetWriterBuilder().build()
343
344      (0 until 10).foreach { i =>
345        val record = new SimpleGroup(schema)
346        record.add(0, i % 2 == 0)
347        record.add(1, i)
348        record.add(2, i.toLong)
349        record.add(3, i.toFloat)
350        record.add(4, i.toDouble)
351        writer.write(record)
352      }
353
354      writer.close()
355    }
356
357    withTempDir { dir =>
358      val path = new Path(dir.toURI.toString, "part-r-0.parquet")
359      makeRawParquetFile(path)
360      readParquetFile(path.toString) { df =>
361        checkAnswer(df, (0 until 10).map { i =>
362          Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) })
363      }
364    }
365  }
366
367  test("write metadata") {
368    val hadoopConf = spark.sessionState.newHadoopConf()
369    withTempPath { file =>
370      val path = new Path(file.toURI.toString)
371      val fs = FileSystem.getLocal(hadoopConf)
372      val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)])
373      writeMetadata(schema, path, hadoopConf)
374
375      assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
376      assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
377
378      val expectedSchema = new ParquetSchemaConverter().convert(schema)
379      val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
380
381      actualSchema.checkContains(expectedSchema)
382      expectedSchema.checkContains(actualSchema)
383    }
384  }
385
386  test("save - overwrite") {
387    withParquetFile((1 to 10).map(i => (i, i.toString))) { file =>
388      val newData = (11 to 20).map(i => (i, i.toString))
389      newData.toDF().write.format("parquet").mode(SaveMode.Overwrite).save(file)
390      readParquetFile(file) { df =>
391        checkAnswer(df, newData.map(Row.fromTuple))
392      }
393    }
394  }
395
396  test("save - ignore") {
397    val data = (1 to 10).map(i => (i, i.toString))
398    withParquetFile(data) { file =>
399      val newData = (11 to 20).map(i => (i, i.toString))
400      newData.toDF().write.format("parquet").mode(SaveMode.Ignore).save(file)
401      readParquetFile(file) { df =>
402        checkAnswer(df, data.map(Row.fromTuple))
403      }
404    }
405  }
406
407  test("save - throw") {
408    val data = (1 to 10).map(i => (i, i.toString))
409    withParquetFile(data) { file =>
410      val newData = (11 to 20).map(i => (i, i.toString))
411      val errorMessage = intercept[Throwable] {
412        newData.toDF().write.format("parquet").mode(SaveMode.ErrorIfExists).save(file)
413      }.getMessage
414      assert(errorMessage.contains("already exists"))
415    }
416  }
417
418  test("save - append") {
419    val data = (1 to 10).map(i => (i, i.toString))
420    withParquetFile(data) { file =>
421      val newData = (11 to 20).map(i => (i, i.toString))
422      newData.toDF().write.format("parquet").mode(SaveMode.Append).save(file)
423      readParquetFile(file) { df =>
424        checkAnswer(df, (data ++ newData).map(Row.fromTuple))
425      }
426    }
427  }
428
429  test("SPARK-6315 regression test") {
430    // Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata.
431    // This has been deprecated by JSON format since 1.2.  Notice that, 1.3 further refactored data
432    // types API, and made StructType.fields an array.  This makes the result of StructType.toString
433    // different from prior versions: there's no "Seq" wrapping the fields part in the string now.
434    val sparkSchema =
435      "StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))"
436
437    // The Parquet schema is intentionally made different from the Spark schema.  Because the new
438    // Parquet data source simply falls back to the Parquet schema once it fails to parse the Spark
439    // schema.  By making these two different, we are able to assert the old style case class string
440    // is parsed successfully.
441    val parquetSchema = MessageTypeParser.parseMessageType(
442      """message root {
443        |  required int32 c;
444        |}
445      """.stripMargin)
446
447    withTempPath { location =>
448      val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
449      val path = new Path(location.getCanonicalPath)
450      val conf = spark.sessionState.newHadoopConf()
451      writeMetadata(parquetSchema, path, conf, extraMetadata)
452
453      readParquetFile(path.toString) { df =>
454        assertResult(df.schema) {
455          StructType(
456            StructField("a", BooleanType, nullable = true) ::
457              StructField("b", IntegerType, nullable = true) ::
458              Nil)
459        }
460      }
461    }
462  }
463
464  test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
465    val extraOptions = Map(
466      SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName,
467      "spark.sql.parquet.output.committer.class" ->
468        classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
469    )
470    withTempPath { dir =>
471      val message = intercept[SparkException] {
472        spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath)
473      }.getCause.getMessage
474      assert(message === "Intentional exception for testing purposes")
475    }
476  }
477
478  test("SPARK-6330 regression test") {
479    // In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
480    // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
481    intercept[Throwable] {
482      spark.read.parquet("file:///nonexistent")
483    }
484    val errorMessage = intercept[Throwable] {
485      spark.read.parquet("hdfs://nonexistent")
486    }.toString
487    assert(errorMessage.contains("UnknownHostException"))
488  }
489
490  test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
491    // Using a output committer that always fail when committing a task, so that both
492    // `commitTask()` and `abortTask()` are invoked.
493    val extraOptions = Map[String, String](
494      "spark.sql.parquet.output.committer.class" ->
495        classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
496    )
497
498    // Before fixing SPARK-7837, the following code results in an NPE because both
499    // `commitTask()` and `abortTask()` try to close output writers.
500
501    withTempPath { dir =>
502      val m1 = intercept[SparkException] {
503        spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
504      }.getCause.getMessage
505      assert(m1.contains("Intentional exception for testing purposes"))
506    }
507
508    withTempPath { dir =>
509      val m2 = intercept[SparkException] {
510        val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1)
511        df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
512      }.getCause.getMessage
513      assert(m2.contains("Intentional exception for testing purposes"))
514    }
515  }
516
517  test("SPARK-11044 Parquet writer version fixed as version1 ") {
518    // For dictionary encoding, Parquet changes the encoding types according to its writer
519    // version. So, this test checks one of the encoding types in order to ensure that
520    // the file is written with writer version2.
521    val extraOptions = Map[String, String](
522      // Write a Parquet file with writer version2.
523      ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString,
524      // By default, dictionary encoding is enabled from Parquet 1.2.0 but
525      // it is enabled just in case.
526      ParquetOutputFormat.ENABLE_DICTIONARY -> "true"
527    )
528
529    val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
530
531    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
532      withTempPath { dir =>
533        val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
534        spark.range(1 << 16).selectExpr("(id % 4) AS i")
535          .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
536
537        val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
538        val columnChunkMetadata = blockMetadata.getColumns.asScala.head
539
540        // If the file is written with version2, this should include
541        // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
542        assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
543      }
544    }
545  }
546
547  test("null and non-null strings") {
548    // Create a dataset where the first values are NULL and then some non-null values. The
549    // number of non-nulls needs to be bigger than the ParquetReader batch size.
550    val data: Dataset[String] = spark.range(200).map (i =>
551      if (i < 150) null
552      else "a"
553    )
554    val df = data.toDF("col")
555    assert(df.agg("col" -> "count").collect().head.getLong(0) == 50)
556
557    withTempPath { dir =>
558      val path = s"${dir.getCanonicalPath}/data"
559      df.write.parquet(path)
560
561      readParquetFile(path) { df2 =>
562        assert(df2.agg("col" -> "count").collect().head.getLong(0) == 50)
563      }
564    }
565  }
566
567  test("read dictionary encoded decimals written as INT32") {
568    ("true" :: "false" :: Nil).foreach { vectorized =>
569      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
570        checkAnswer(
571          // Decimal column in this file is encoded using plain dictionary
572          readResourceParquetFile("test-data/dec-in-i32.parquet"),
573          spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec))
574      }
575    }
576  }
577
578  test("read dictionary encoded decimals written as INT64") {
579    ("true" :: "false" :: Nil).foreach { vectorized =>
580      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
581        checkAnswer(
582          // Decimal column in this file is encoded using plain dictionary
583          readResourceParquetFile("test-data/dec-in-i64.parquet"),
584          spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
585      }
586    }
587  }
588
589  test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
590    ("true" :: "false" :: Nil).foreach { vectorized =>
591      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
592        checkAnswer(
593          // Decimal column in this file is encoded using plain dictionary
594          readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
595          spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
596      }
597    }
598  }
599
600  test("SPARK-12589 copy() on rows returned from reader works for strings") {
601    withTempPath { dir =>
602      val data = (1, "abc") ::(2, "helloabcde") :: Nil
603      data.toDF().write.parquet(dir.getCanonicalPath)
604      var hash1: Int = 0
605      var hash2: Int = 0
606      (false :: true :: Nil).foreach { v =>
607        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) {
608          val df = spark.read.parquet(dir.getCanonicalPath)
609          val rows = df.queryExecution.toRdd.map(_.copy()).collect()
610          val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
611          if (!v) {
612            hash1 = unsafeRows(0).hashCode()
613            hash2 = unsafeRows(1).hashCode()
614          } else {
615            assert(hash1 == unsafeRows(0).hashCode())
616            assert(hash2 == unsafeRows(1).hashCode())
617          }
618        }
619      }
620    }
621  }
622
623  test("VectorizedParquetRecordReader - direct path read") {
624    val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
625    withTempPath { dir =>
626      spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
627      val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
628      {
629        val reader = new VectorizedParquetRecordReader
630        try {
631          reader.initialize(file, null)
632          val result = mutable.ArrayBuffer.empty[(Int, String)]
633          while (reader.nextKeyValue()) {
634            val row = reader.getCurrentValue.asInstanceOf[InternalRow]
635            val v = (row.getInt(0), row.getString(1))
636            result += v
637          }
638          assert(data == result)
639        } finally {
640          reader.close()
641        }
642      }
643
644      // Project just one column
645      {
646        val reader = new VectorizedParquetRecordReader
647        try {
648          reader.initialize(file, ("_2" :: Nil).asJava)
649          val result = mutable.ArrayBuffer.empty[(String)]
650          while (reader.nextKeyValue()) {
651            val row = reader.getCurrentValue.asInstanceOf[InternalRow]
652            result += row.getString(0)
653          }
654          assert(data.map(_._2) == result)
655        } finally {
656          reader.close()
657        }
658      }
659
660      // Project columns in opposite order
661      {
662        val reader = new VectorizedParquetRecordReader
663        try {
664          reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
665          val result = mutable.ArrayBuffer.empty[(String, Int)]
666          while (reader.nextKeyValue()) {
667            val row = reader.getCurrentValue.asInstanceOf[InternalRow]
668            val v = (row.getString(0), row.getInt(1))
669            result += v
670          }
671          assert(data.map { x => (x._2, x._1) } == result)
672        } finally {
673          reader.close()
674        }
675      }
676
677      // Empty projection
678      {
679        val reader = new VectorizedParquetRecordReader
680        try {
681          reader.initialize(file, List[String]().asJava)
682          var result = 0
683          while (reader.nextKeyValue()) {
684            result += 1
685          }
686          assert(result == data.length)
687        } finally {
688          reader.close()
689        }
690      }
691    }
692  }
693
694  test("VectorizedParquetRecordReader - partition column types") {
695    withTempPath { dir =>
696      Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
697
698      val dataTypes =
699        Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
700          FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
701
702      val constantValues =
703        Seq(
704          UTF8String.fromString("a string"),
705          true,
706          1.toByte,
707          2.toShort,
708          3,
709          Long.MaxValue,
710          0.25.toFloat,
711          0.75D,
712          Decimal("1234.23456"),
713          DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
714          DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
715
716      dataTypes.zip(constantValues).foreach { case (dt, v) =>
717        val schema = StructType(StructField("pcol", dt) :: Nil)
718        val vectorizedReader = new VectorizedParquetRecordReader
719        val partitionValues = new GenericInternalRow(Array(v))
720        val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
721
722        try {
723          vectorizedReader.initialize(file, null)
724          vectorizedReader.initBatch(schema, partitionValues)
725          vectorizedReader.nextKeyValue()
726          val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
727
728          // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
729          // in order to use get(...) method which is not implemented in `ColumnarBatch`.
730          val actual = row.copy().get(1, dt)
731          val expected = v
732          assert(actual == expected)
733        } finally {
734          vectorizedReader.close()
735        }
736      }
737    }
738  }
739
740  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
741    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
742      val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf)
743      assert(option.compressionCodecClassName == "UNCOMPRESSED")
744    }
745  }
746}
747
748class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
749  extends ParquetOutputCommitter(outputPath, context) {
750
751  override def commitJob(jobContext: JobContext): Unit = {
752    sys.error("Intentional exception for testing purposes")
753  }
754}
755
756class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
757  extends ParquetOutputCommitter(outputPath, context) {
758
759  override def commitTask(context: TaskAttemptContext): Unit = {
760    sys.error("Intentional exception for testing purposes")
761  }
762}
763