1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.datasources.parquet 19 20import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter} 21 22import org.apache.hadoop.conf.Configuration 23import org.apache.hadoop.fs.{Path, PathFilter} 24import org.apache.parquet.hadoop.{ParquetFileReader, ParquetWriter} 25import org.apache.parquet.hadoop.api.WriteSupport 26import org.apache.parquet.hadoop.api.WriteSupport.WriteContext 27import org.apache.parquet.io.api.RecordConsumer 28import org.apache.parquet.schema.{MessageType, MessageTypeParser} 29 30import org.apache.spark.sql.QueryTest 31 32/** 33 * Helper class for testing Parquet compatibility. 34 */ 35private[sql] abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest { 36 protected def readParquetSchema(path: String): MessageType = { 37 readParquetSchema(path, { path => !path.getName.startsWith("_") }) 38 } 39 40 protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = { 41 val hadoopConf = spark.sessionState.newHadoopConf() 42 val fsPath = new Path(path) 43 val fs = fsPath.getFileSystem(hadoopConf) 44 val parquetFiles = fs.listStatus(fsPath, new PathFilter { 45 override def accept(path: Path): Boolean = pathFilter(path) 46 }).toSeq.asJava 47 48 val footers = 49 ParquetFileReader.readAllFootersInParallel(hadoopConf, parquetFiles, true) 50 footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema 51 } 52 53 protected def logParquetSchema(path: String): Unit = { 54 logInfo( 55 s"""Schema of the Parquet file written by parquet-avro: 56 |${readParquetSchema(path)} 57 """.stripMargin) 58 } 59} 60 61private[sql] object ParquetCompatibilityTest { 62 implicit class RecordConsumerDSL(consumer: RecordConsumer) { 63 def message(f: => Unit): Unit = { 64 consumer.startMessage() 65 f 66 consumer.endMessage() 67 } 68 69 def group(f: => Unit): Unit = { 70 consumer.startGroup() 71 f 72 consumer.endGroup() 73 } 74 75 def field(name: String, index: Int)(f: => Unit): Unit = { 76 consumer.startField(name, index) 77 f 78 consumer.endField(name, index) 79 } 80 } 81 82 /** 83 * A testing Parquet [[WriteSupport]] implementation used to write manually constructed Parquet 84 * records with arbitrary structures. 85 */ 86 private class DirectWriteSupport(schema: MessageType, metadata: Map[String, String]) 87 extends WriteSupport[RecordConsumer => Unit] { 88 89 private var recordConsumer: RecordConsumer = _ 90 91 override def init(configuration: Configuration): WriteContext = { 92 new WriteContext(schema, metadata.asJava) 93 } 94 95 override def write(recordWriter: RecordConsumer => Unit): Unit = { 96 recordWriter.apply(recordConsumer) 97 } 98 99 override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { 100 this.recordConsumer = recordConsumer 101 } 102 } 103 104 /** 105 * Writes arbitrary messages conforming to a given `schema` to a Parquet file located by `path`. 106 * Records are produced by `recordWriters`. 107 */ 108 def writeDirect(path: String, schema: String, recordWriters: (RecordConsumer => Unit)*): Unit = { 109 writeDirect(path, schema, Map.empty[String, String], recordWriters: _*) 110 } 111 112 /** 113 * Writes arbitrary messages conforming to a given `schema` to a Parquet file located by `path` 114 * with given user-defined key-value `metadata`. Records are produced by `recordWriters`. 115 */ 116 def writeDirect( 117 path: String, 118 schema: String, 119 metadata: Map[String, String], 120 recordWriters: (RecordConsumer => Unit)*): Unit = { 121 val messageType = MessageTypeParser.parseMessageType(schema) 122 val testWriteSupport = new DirectWriteSupport(messageType, metadata) 123 /** 124 * Provide a builder for constructing a parquet writer - after PARQUET-248 directly constructing 125 * the writer is deprecated and should be done through a builder. The default builders include 126 * Avro - but for raw Parquet writing we must create our own builder. 127 */ 128 class ParquetWriterBuilder() extends 129 ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) { 130 override def getWriteSupport(conf: Configuration) = testWriteSupport 131 override def self() = this 132 } 133 val parquetWriter = new ParquetWriterBuilder().build() 134 try recordWriters.foreach(parquetWriter.write) finally parquetWriter.close() 135 } 136} 137