1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.datasources.parquet
19
20import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
21
22import org.apache.hadoop.conf.Configuration
23import org.apache.hadoop.fs.{Path, PathFilter}
24import org.apache.parquet.hadoop.{ParquetFileReader, ParquetWriter}
25import org.apache.parquet.hadoop.api.WriteSupport
26import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
27import org.apache.parquet.io.api.RecordConsumer
28import org.apache.parquet.schema.{MessageType, MessageTypeParser}
29
30import org.apache.spark.sql.QueryTest
31
32/**
33 * Helper class for testing Parquet compatibility.
34 */
35private[sql] abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest {
36  protected def readParquetSchema(path: String): MessageType = {
37    readParquetSchema(path, { path => !path.getName.startsWith("_") })
38  }
39
40  protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = {
41    val hadoopConf = spark.sessionState.newHadoopConf()
42    val fsPath = new Path(path)
43    val fs = fsPath.getFileSystem(hadoopConf)
44    val parquetFiles = fs.listStatus(fsPath, new PathFilter {
45      override def accept(path: Path): Boolean = pathFilter(path)
46    }).toSeq.asJava
47
48    val footers =
49      ParquetFileReader.readAllFootersInParallel(hadoopConf, parquetFiles, true)
50    footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema
51  }
52
53  protected def logParquetSchema(path: String): Unit = {
54    logInfo(
55      s"""Schema of the Parquet file written by parquet-avro:
56         |${readParquetSchema(path)}
57       """.stripMargin)
58  }
59}
60
61private[sql] object ParquetCompatibilityTest {
62  implicit class RecordConsumerDSL(consumer: RecordConsumer) {
63    def message(f: => Unit): Unit = {
64      consumer.startMessage()
65      f
66      consumer.endMessage()
67    }
68
69    def group(f: => Unit): Unit = {
70      consumer.startGroup()
71      f
72      consumer.endGroup()
73    }
74
75    def field(name: String, index: Int)(f: => Unit): Unit = {
76      consumer.startField(name, index)
77      f
78      consumer.endField(name, index)
79    }
80  }
81
82  /**
83   * A testing Parquet [[WriteSupport]] implementation used to write manually constructed Parquet
84   * records with arbitrary structures.
85   */
86  private class DirectWriteSupport(schema: MessageType, metadata: Map[String, String])
87    extends WriteSupport[RecordConsumer => Unit] {
88
89    private var recordConsumer: RecordConsumer = _
90
91    override def init(configuration: Configuration): WriteContext = {
92      new WriteContext(schema, metadata.asJava)
93    }
94
95    override def write(recordWriter: RecordConsumer => Unit): Unit = {
96      recordWriter.apply(recordConsumer)
97    }
98
99    override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
100      this.recordConsumer = recordConsumer
101    }
102  }
103
104  /**
105   * Writes arbitrary messages conforming to a given `schema` to a Parquet file located by `path`.
106   * Records are produced by `recordWriters`.
107   */
108  def writeDirect(path: String, schema: String, recordWriters: (RecordConsumer => Unit)*): Unit = {
109    writeDirect(path, schema, Map.empty[String, String], recordWriters: _*)
110  }
111
112  /**
113   * Writes arbitrary messages conforming to a given `schema` to a Parquet file located by `path`
114   * with given user-defined key-value `metadata`. Records are produced by `recordWriters`.
115   */
116  def writeDirect(
117      path: String,
118      schema: String,
119      metadata: Map[String, String],
120      recordWriters: (RecordConsumer => Unit)*): Unit = {
121    val messageType = MessageTypeParser.parseMessageType(schema)
122    val testWriteSupport = new DirectWriteSupport(messageType, metadata)
123    /**
124     * Provide a builder for constructing a parquet writer - after PARQUET-248 directly constructing
125     * the writer is deprecated and should be done through a builder. The default builders include
126     * Avro - but for raw Parquet writing we must create our own builder.
127     */
128    class ParquetWriterBuilder() extends
129        ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) {
130      override def getWriteSupport(conf: Configuration) = testWriteSupport
131      override def self() = this
132    }
133    val parquetWriter = new ParquetWriterBuilder().build()
134    try recordWriters.foreach(parquetWriter.write) finally parquetWriter.close()
135  }
136}
137