1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.datasources.parquet 19 20import scala.collection.JavaConverters._ 21import scala.collection.mutable 22import scala.reflect.ClassTag 23import scala.reflect.runtime.universe.TypeTag 24 25import org.apache.hadoop.conf.Configuration 26import org.apache.hadoop.fs.{FileSystem, Path} 27import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} 28import org.apache.parquet.column.{Encoding, ParquetProperties} 29import org.apache.parquet.example.data.{Group, GroupWriter} 30import org.apache.parquet.example.data.simple.SimpleGroup 31import org.apache.parquet.hadoop._ 32import org.apache.parquet.hadoop.api.WriteSupport 33import org.apache.parquet.hadoop.api.WriteSupport.WriteContext 34import org.apache.parquet.hadoop.metadata.CompressionCodecName 35import org.apache.parquet.io.api.RecordConsumer 36import org.apache.parquet.schema.{MessageType, MessageTypeParser} 37 38import org.apache.spark.SparkException 39import org.apache.spark.sql._ 40import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} 41import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} 42import org.apache.spark.sql.catalyst.util.DateTimeUtils 43import org.apache.spark.sql.internal.SQLConf 44import org.apache.spark.sql.test.SharedSQLContext 45import org.apache.spark.sql.types._ 46import org.apache.spark.unsafe.types.UTF8String 47 48// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport 49// with an empty configuration (it is after all not intended to be used in this way?) 50// and members are private so we need to make our own in order to pass the schema 51// to the writer. 52private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { 53 var groupWriter: GroupWriter = null 54 55 override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { 56 groupWriter = new GroupWriter(recordConsumer, schema) 57 } 58 59 override def init(configuration: Configuration): WriteContext = { 60 new WriteContext(schema, new java.util.HashMap[String, String]()) 61 } 62 63 override def write(record: Group) { 64 groupWriter.write(record) 65 } 66} 67 68/** 69 * A test suite that tests basic Parquet I/O. 70 */ 71class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { 72 import testImplicits._ 73 74 /** 75 * Writes `data` to a Parquet file, reads it back and check file contents. 76 */ 77 protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = { 78 withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple))) 79 } 80 81 test("basic data types (without binary)") { 82 val data = (1 to 4).map { i => 83 (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) 84 } 85 checkParquetFile(data) 86 } 87 88 test("raw binary") { 89 val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte))) 90 withParquetDataFrame(data) { df => 91 assertResult(data.map(_._1.mkString(",")).sorted) { 92 df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted 93 } 94 } 95 } 96 97 test("SPARK-11694 Parquet logical types are not being tested properly") { 98 val parquetSchema = MessageTypeParser.parseMessageType( 99 """message root { 100 | required int32 a(INT_8); 101 | required int32 b(INT_16); 102 | required int32 c(DATE); 103 | required int32 d(DECIMAL(1,0)); 104 | required int64 e(DECIMAL(10,0)); 105 | required binary f(UTF8); 106 | required binary g(ENUM); 107 | required binary h(DECIMAL(32,0)); 108 | required fixed_len_byte_array(32) i(DECIMAL(32,0)); 109 |} 110 """.stripMargin) 111 112 val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), 113 DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0)) 114 115 withTempPath { location => 116 val path = new Path(location.getCanonicalPath) 117 val conf = spark.sessionState.newHadoopConf() 118 writeMetadata(parquetSchema, path, conf) 119 readParquetFile(path.toString)(df => { 120 val sparkTypes = df.schema.map(_.dataType) 121 assert(sparkTypes === expectedSparkTypes) 122 }) 123 } 124 } 125 126 test("string") { 127 val data = (1 to 4).map(i => Tuple1(i.toString)) 128 // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL 129 // as we store Spark SQL schema in the extra metadata. 130 withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "false")(checkParquetFile(data)) 131 withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data)) 132 } 133 134 testStandardAndLegacyModes("fixed-length decimals") { 135 def makeDecimalRDD(decimal: DecimalType): DataFrame = { 136 spark 137 .range(1000) 138 // Parquet doesn't allow column names with spaces, have to add an alias here. 139 // Minus 500 here so that negative decimals are also tested. 140 .select((('id - 500) / 100.0) cast decimal as 'dec) 141 .coalesce(1) 142 } 143 144 val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37)) 145 for ((precision, scale) <- combinations) { 146 withTempPath { dir => 147 val data = makeDecimalRDD(DecimalType(precision, scale)) 148 data.write.parquet(dir.getCanonicalPath) 149 readParquetFile(dir.getCanonicalPath) { df => { 150 checkAnswer(df, data.collect().toSeq) 151 }} 152 } 153 } 154 } 155 156 test("date type") { 157 def makeDateRDD(): DataFrame = 158 sparkContext 159 .parallelize(0 to 1000) 160 .map(i => Tuple1(DateTimeUtils.toJavaDate(i))) 161 .toDF() 162 .select($"_1") 163 164 withTempPath { dir => 165 val data = makeDateRDD() 166 data.write.parquet(dir.getCanonicalPath) 167 readParquetFile(dir.getCanonicalPath) { df => 168 checkAnswer(df, data.collect().toSeq) 169 } 170 } 171 } 172 173 testStandardAndLegacyModes("map") { 174 val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) 175 checkParquetFile(data) 176 } 177 178 testStandardAndLegacyModes("array") { 179 val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1))) 180 checkParquetFile(data) 181 } 182 183 testStandardAndLegacyModes("array and double") { 184 val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble))) 185 checkParquetFile(data) 186 } 187 188 testStandardAndLegacyModes("struct") { 189 val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) 190 withParquetDataFrame(data) { df => 191 // Structs are converted to `Row`s 192 checkAnswer(df, data.map { case Tuple1(struct) => 193 Row(Row(struct.productIterator.toSeq: _*)) 194 }) 195 } 196 } 197 198 testStandardAndLegacyModes("nested struct with array of array as field") { 199 val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) 200 withParquetDataFrame(data) { df => 201 // Structs are converted to `Row`s 202 checkAnswer(df, data.map { case Tuple1(struct) => 203 Row(Row(struct.productIterator.toSeq: _*)) 204 }) 205 } 206 } 207 208 testStandardAndLegacyModes("nested map with struct as value type") { 209 val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i")))) 210 withParquetDataFrame(data) { df => 211 checkAnswer(df, data.map { case Tuple1(m) => 212 Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) 213 }) 214 } 215 } 216 217 test("nulls") { 218 val allNulls = ( 219 null.asInstanceOf[java.lang.Boolean], 220 null.asInstanceOf[Integer], 221 null.asInstanceOf[java.lang.Long], 222 null.asInstanceOf[java.lang.Float], 223 null.asInstanceOf[java.lang.Double]) 224 225 withParquetDataFrame(allNulls :: Nil) { df => 226 val rows = df.collect() 227 assert(rows.length === 1) 228 assert(rows.head === Row(Seq.fill(5)(null): _*)) 229 } 230 } 231 232 test("nones") { 233 val allNones = ( 234 None.asInstanceOf[Option[Int]], 235 None.asInstanceOf[Option[Long]], 236 None.asInstanceOf[Option[String]]) 237 238 withParquetDataFrame(allNones :: Nil) { df => 239 val rows = df.collect() 240 assert(rows.length === 1) 241 assert(rows.head === Row(Seq.fill(3)(null): _*)) 242 } 243 } 244 245 test("SPARK-10113 Support for unsigned Parquet logical types") { 246 val parquetSchema = MessageTypeParser.parseMessageType( 247 """message root { 248 | required int32 c(UINT_32); 249 |} 250 """.stripMargin) 251 252 withTempPath { location => 253 val path = new Path(location.getCanonicalPath) 254 val conf = spark.sessionState.newHadoopConf() 255 writeMetadata(parquetSchema, path, conf) 256 val errorMessage = intercept[Throwable] { 257 spark.read.parquet(path.toString).printSchema() 258 }.toString 259 assert(errorMessage.contains("Parquet type not supported")) 260 } 261 } 262 263 test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded types)") { 264 val parquetSchema = MessageTypeParser.parseMessageType( 265 """message root { 266 | required binary a(JSON); 267 | required binary b(BSON); 268 |} 269 """.stripMargin) 270 271 val expectedSparkTypes = Seq(StringType, BinaryType) 272 273 withTempPath { location => 274 val path = new Path(location.getCanonicalPath) 275 val conf = spark.sessionState.newHadoopConf() 276 writeMetadata(parquetSchema, path, conf) 277 val sparkTypes = spark.read.parquet(path.toString).schema.map(_.dataType) 278 assert(sparkTypes === expectedSparkTypes) 279 } 280 } 281 282 test("compression codec") { 283 val hadoopConf = spark.sessionState.newHadoopConf() 284 def compressionCodecFor(path: String, codecName: String): String = { 285 val codecs = for { 286 footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) 287 block <- footer.getParquetMetadata.getBlocks.asScala 288 column <- block.getColumns.asScala 289 } yield column.getCodec.name() 290 291 assert(codecs.distinct === Seq(codecName)) 292 codecs.head 293 } 294 295 val data = (0 until 10).map(i => (i, i.toString)) 296 297 def checkCompressionCodec(codec: CompressionCodecName): Unit = { 298 withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { 299 withParquetFile(data) { path => 300 assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase) { 301 compressionCodecFor(path, codec.name()) 302 } 303 } 304 } 305 } 306 307 // Checks default compression codec 308 checkCompressionCodec( 309 CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) 310 311 checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) 312 checkCompressionCodec(CompressionCodecName.GZIP) 313 checkCompressionCodec(CompressionCodecName.SNAPPY) 314 } 315 316 test("read raw Parquet file") { 317 def makeRawParquetFile(path: Path): Unit = { 318 val schema = MessageTypeParser.parseMessageType( 319 """ 320 |message root { 321 | required boolean _1; 322 | required int32 _2; 323 | required int64 _3; 324 | required float _4; 325 | required double _5; 326 |} 327 """.stripMargin) 328 329 val testWriteSupport = new TestGroupWriteSupport(schema) 330 /** 331 * Provide a builder for constructing a parquet writer - after PARQUET-248 directly 332 * constructing the writer is deprecated and should be done through a builder. The default 333 * builders include Avro - but for raw Parquet writing we must create our own builder. 334 */ 335 class ParquetWriterBuilder() extends 336 ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { 337 override def getWriteSupport(conf: Configuration) = testWriteSupport 338 339 override def self() = this 340 } 341 342 val writer = new ParquetWriterBuilder().build() 343 344 (0 until 10).foreach { i => 345 val record = new SimpleGroup(schema) 346 record.add(0, i % 2 == 0) 347 record.add(1, i) 348 record.add(2, i.toLong) 349 record.add(3, i.toFloat) 350 record.add(4, i.toDouble) 351 writer.write(record) 352 } 353 354 writer.close() 355 } 356 357 withTempDir { dir => 358 val path = new Path(dir.toURI.toString, "part-r-0.parquet") 359 makeRawParquetFile(path) 360 readParquetFile(path.toString) { df => 361 checkAnswer(df, (0 until 10).map { i => 362 Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) }) 363 } 364 } 365 } 366 367 test("write metadata") { 368 val hadoopConf = spark.sessionState.newHadoopConf() 369 withTempPath { file => 370 val path = new Path(file.toURI.toString) 371 val fs = FileSystem.getLocal(hadoopConf) 372 val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)]) 373 writeMetadata(schema, path, hadoopConf) 374 375 assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) 376 assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) 377 378 val expectedSchema = new ParquetSchemaConverter().convert(schema) 379 val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema 380 381 actualSchema.checkContains(expectedSchema) 382 expectedSchema.checkContains(actualSchema) 383 } 384 } 385 386 test("save - overwrite") { 387 withParquetFile((1 to 10).map(i => (i, i.toString))) { file => 388 val newData = (11 to 20).map(i => (i, i.toString)) 389 newData.toDF().write.format("parquet").mode(SaveMode.Overwrite).save(file) 390 readParquetFile(file) { df => 391 checkAnswer(df, newData.map(Row.fromTuple)) 392 } 393 } 394 } 395 396 test("save - ignore") { 397 val data = (1 to 10).map(i => (i, i.toString)) 398 withParquetFile(data) { file => 399 val newData = (11 to 20).map(i => (i, i.toString)) 400 newData.toDF().write.format("parquet").mode(SaveMode.Ignore).save(file) 401 readParquetFile(file) { df => 402 checkAnswer(df, data.map(Row.fromTuple)) 403 } 404 } 405 } 406 407 test("save - throw") { 408 val data = (1 to 10).map(i => (i, i.toString)) 409 withParquetFile(data) { file => 410 val newData = (11 to 20).map(i => (i, i.toString)) 411 val errorMessage = intercept[Throwable] { 412 newData.toDF().write.format("parquet").mode(SaveMode.ErrorIfExists).save(file) 413 }.getMessage 414 assert(errorMessage.contains("already exists")) 415 } 416 } 417 418 test("save - append") { 419 val data = (1 to 10).map(i => (i, i.toString)) 420 withParquetFile(data) { file => 421 val newData = (11 to 20).map(i => (i, i.toString)) 422 newData.toDF().write.format("parquet").mode(SaveMode.Append).save(file) 423 readParquetFile(file) { df => 424 checkAnswer(df, (data ++ newData).map(Row.fromTuple)) 425 } 426 } 427 } 428 429 test("SPARK-6315 regression test") { 430 // Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata. 431 // This has been deprecated by JSON format since 1.2. Notice that, 1.3 further refactored data 432 // types API, and made StructType.fields an array. This makes the result of StructType.toString 433 // different from prior versions: there's no "Seq" wrapping the fields part in the string now. 434 val sparkSchema = 435 "StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))" 436 437 // The Parquet schema is intentionally made different from the Spark schema. Because the new 438 // Parquet data source simply falls back to the Parquet schema once it fails to parse the Spark 439 // schema. By making these two different, we are able to assert the old style case class string 440 // is parsed successfully. 441 val parquetSchema = MessageTypeParser.parseMessageType( 442 """message root { 443 | required int32 c; 444 |} 445 """.stripMargin) 446 447 withTempPath { location => 448 val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) 449 val path = new Path(location.getCanonicalPath) 450 val conf = spark.sessionState.newHadoopConf() 451 writeMetadata(parquetSchema, path, conf, extraMetadata) 452 453 readParquetFile(path.toString) { df => 454 assertResult(df.schema) { 455 StructType( 456 StructField("a", BooleanType, nullable = true) :: 457 StructField("b", IntegerType, nullable = true) :: 458 Nil) 459 } 460 } 461 } 462 } 463 464 test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { 465 val extraOptions = Map( 466 SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, 467 "spark.sql.parquet.output.committer.class" -> 468 classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName 469 ) 470 withTempPath { dir => 471 val message = intercept[SparkException] { 472 spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) 473 }.getCause.getMessage 474 assert(message === "Intentional exception for testing purposes") 475 } 476 } 477 478 test("SPARK-6330 regression test") { 479 // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: 480 // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// 481 intercept[Throwable] { 482 spark.read.parquet("file:///nonexistent") 483 } 484 val errorMessage = intercept[Throwable] { 485 spark.read.parquet("hdfs://nonexistent") 486 }.toString 487 assert(errorMessage.contains("UnknownHostException")) 488 } 489 490 test("SPARK-7837 Do not close output writer twice when commitTask() fails") { 491 // Using a output committer that always fail when committing a task, so that both 492 // `commitTask()` and `abortTask()` are invoked. 493 val extraOptions = Map[String, String]( 494 "spark.sql.parquet.output.committer.class" -> 495 classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName 496 ) 497 498 // Before fixing SPARK-7837, the following code results in an NPE because both 499 // `commitTask()` and `abortTask()` try to close output writers. 500 501 withTempPath { dir => 502 val m1 = intercept[SparkException] { 503 spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) 504 }.getCause.getMessage 505 assert(m1.contains("Intentional exception for testing purposes")) 506 } 507 508 withTempPath { dir => 509 val m2 = intercept[SparkException] { 510 val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) 511 df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) 512 }.getCause.getMessage 513 assert(m2.contains("Intentional exception for testing purposes")) 514 } 515 } 516 517 test("SPARK-11044 Parquet writer version fixed as version1 ") { 518 // For dictionary encoding, Parquet changes the encoding types according to its writer 519 // version. So, this test checks one of the encoding types in order to ensure that 520 // the file is written with writer version2. 521 val extraOptions = Map[String, String]( 522 // Write a Parquet file with writer version2. 523 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, 524 // By default, dictionary encoding is enabled from Parquet 1.2.0 but 525 // it is enabled just in case. 526 ParquetOutputFormat.ENABLE_DICTIONARY -> "true" 527 ) 528 529 val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) 530 531 withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { 532 withTempPath { dir => 533 val path = s"${dir.getCanonicalPath}/part-r-0.parquet" 534 spark.range(1 << 16).selectExpr("(id % 4) AS i") 535 .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) 536 537 val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head 538 val columnChunkMetadata = blockMetadata.getColumns.asScala.head 539 540 // If the file is written with version2, this should include 541 // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY 542 assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) 543 } 544 } 545 } 546 547 test("null and non-null strings") { 548 // Create a dataset where the first values are NULL and then some non-null values. The 549 // number of non-nulls needs to be bigger than the ParquetReader batch size. 550 val data: Dataset[String] = spark.range(200).map (i => 551 if (i < 150) null 552 else "a" 553 ) 554 val df = data.toDF("col") 555 assert(df.agg("col" -> "count").collect().head.getLong(0) == 50) 556 557 withTempPath { dir => 558 val path = s"${dir.getCanonicalPath}/data" 559 df.write.parquet(path) 560 561 readParquetFile(path) { df2 => 562 assert(df2.agg("col" -> "count").collect().head.getLong(0) == 50) 563 } 564 } 565 } 566 567 test("read dictionary encoded decimals written as INT32") { 568 ("true" :: "false" :: Nil).foreach { vectorized => 569 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { 570 checkAnswer( 571 // Decimal column in this file is encoded using plain dictionary 572 readResourceParquetFile("test-data/dec-in-i32.parquet"), 573 spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec)) 574 } 575 } 576 } 577 578 test("read dictionary encoded decimals written as INT64") { 579 ("true" :: "false" :: Nil).foreach { vectorized => 580 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { 581 checkAnswer( 582 // Decimal column in this file is encoded using plain dictionary 583 readResourceParquetFile("test-data/dec-in-i64.parquet"), 584 spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) 585 } 586 } 587 } 588 589 test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") { 590 ("true" :: "false" :: Nil).foreach { vectorized => 591 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { 592 checkAnswer( 593 // Decimal column in this file is encoded using plain dictionary 594 readResourceParquetFile("test-data/dec-in-fixed-len.parquet"), 595 spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) 596 } 597 } 598 } 599 600 test("SPARK-12589 copy() on rows returned from reader works for strings") { 601 withTempPath { dir => 602 val data = (1, "abc") ::(2, "helloabcde") :: Nil 603 data.toDF().write.parquet(dir.getCanonicalPath) 604 var hash1: Int = 0 605 var hash2: Int = 0 606 (false :: true :: Nil).foreach { v => 607 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) { 608 val df = spark.read.parquet(dir.getCanonicalPath) 609 val rows = df.queryExecution.toRdd.map(_.copy()).collect() 610 val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow]) 611 if (!v) { 612 hash1 = unsafeRows(0).hashCode() 613 hash2 = unsafeRows(1).hashCode() 614 } else { 615 assert(hash1 == unsafeRows(0).hashCode()) 616 assert(hash2 == unsafeRows(1).hashCode()) 617 } 618 } 619 } 620 } 621 } 622 623 test("VectorizedParquetRecordReader - direct path read") { 624 val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString)) 625 withTempPath { dir => 626 spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) 627 val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0); 628 { 629 val reader = new VectorizedParquetRecordReader 630 try { 631 reader.initialize(file, null) 632 val result = mutable.ArrayBuffer.empty[(Int, String)] 633 while (reader.nextKeyValue()) { 634 val row = reader.getCurrentValue.asInstanceOf[InternalRow] 635 val v = (row.getInt(0), row.getString(1)) 636 result += v 637 } 638 assert(data == result) 639 } finally { 640 reader.close() 641 } 642 } 643 644 // Project just one column 645 { 646 val reader = new VectorizedParquetRecordReader 647 try { 648 reader.initialize(file, ("_2" :: Nil).asJava) 649 val result = mutable.ArrayBuffer.empty[(String)] 650 while (reader.nextKeyValue()) { 651 val row = reader.getCurrentValue.asInstanceOf[InternalRow] 652 result += row.getString(0) 653 } 654 assert(data.map(_._2) == result) 655 } finally { 656 reader.close() 657 } 658 } 659 660 // Project columns in opposite order 661 { 662 val reader = new VectorizedParquetRecordReader 663 try { 664 reader.initialize(file, ("_2" :: "_1" :: Nil).asJava) 665 val result = mutable.ArrayBuffer.empty[(String, Int)] 666 while (reader.nextKeyValue()) { 667 val row = reader.getCurrentValue.asInstanceOf[InternalRow] 668 val v = (row.getString(0), row.getInt(1)) 669 result += v 670 } 671 assert(data.map { x => (x._2, x._1) } == result) 672 } finally { 673 reader.close() 674 } 675 } 676 677 // Empty projection 678 { 679 val reader = new VectorizedParquetRecordReader 680 try { 681 reader.initialize(file, List[String]().asJava) 682 var result = 0 683 while (reader.nextKeyValue()) { 684 result += 1 685 } 686 assert(result == data.length) 687 } finally { 688 reader.close() 689 } 690 } 691 } 692 } 693 694 test("VectorizedParquetRecordReader - partition column types") { 695 withTempPath { dir => 696 Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath) 697 698 val dataTypes = 699 Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType, 700 FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType) 701 702 val constantValues = 703 Seq( 704 UTF8String.fromString("a string"), 705 true, 706 1.toByte, 707 2.toShort, 708 3, 709 Long.MaxValue, 710 0.25.toFloat, 711 0.75D, 712 Decimal("1234.23456"), 713 DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), 714 DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"))) 715 716 dataTypes.zip(constantValues).foreach { case (dt, v) => 717 val schema = StructType(StructField("pcol", dt) :: Nil) 718 val vectorizedReader = new VectorizedParquetRecordReader 719 val partitionValues = new GenericInternalRow(Array(v)) 720 val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) 721 722 try { 723 vectorizedReader.initialize(file, null) 724 vectorizedReader.initBatch(schema, partitionValues) 725 vectorizedReader.nextKeyValue() 726 val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] 727 728 // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` 729 // in order to use get(...) method which is not implemented in `ColumnarBatch`. 730 val actual = row.copy().get(1, dt) 731 val expected = v 732 assert(actual == expected) 733 } finally { 734 vectorizedReader.close() 735 } 736 } 737 } 738 } 739 740 test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { 741 withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { 742 val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf) 743 assert(option.compressionCodecClassName == "UNCOMPRESSED") 744 } 745 } 746} 747 748class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) 749 extends ParquetOutputCommitter(outputPath, context) { 750 751 override def commitJob(jobContext: JobContext): Unit = { 752 sys.error("Intentional exception for testing purposes") 753 } 754} 755 756class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) 757 extends ParquetOutputCommitter(outputPath, context) { 758 759 override def commitTask(context: TaskAttemptContext): Unit = { 760 sys.error("Intentional exception for testing purposes") 761 } 762} 763