1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package metadata
18
19import (
20	"bytes"
21	"context"
22	"io"
23	"reflect"
24
25	"github.com/apache/arrow/go/v6/arrow/memory"
26	"github.com/apache/arrow/go/v6/parquet"
27	"github.com/apache/arrow/go/v6/parquet/compress"
28	"github.com/apache/arrow/go/v6/parquet/internal/encryption"
29	format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
30	"github.com/apache/arrow/go/v6/parquet/internal/thrift"
31	"github.com/apache/arrow/go/v6/parquet/schema"
32	"golang.org/x/xerrors"
33)
34
35// PageEncodingStats is used for counting the number of pages of specific
36// types with the given internal encoding.
37type PageEncodingStats struct {
38	Encoding parquet.Encoding
39	PageType format.PageType
40}
41
42type statvalues struct {
43	*format.Statistics
44}
45
46func (s *statvalues) GetMin() []byte { return s.GetMinValue() }
47func (s *statvalues) GetMax() []byte { return s.GetMaxValue() }
48func (s *statvalues) IsSetMin() bool { return s.IsSetMinValue() }
49func (s *statvalues) IsSetMax() bool { return s.IsSetMaxValue() }
50
51func makeColumnStats(metadata *format.ColumnMetaData, descr *schema.Column, mem memory.Allocator) TypedStatistics {
52	if descr.ColumnOrder() == parquet.ColumnOrders.TypeDefinedOrder {
53		return NewStatisticsFromEncoded(descr, mem,
54			metadata.NumValues-metadata.Statistics.GetNullCount(),
55			&statvalues{metadata.Statistics})
56	}
57	return NewStatisticsFromEncoded(descr, mem,
58		metadata.NumValues-metadata.Statistics.GetNullCount(),
59		metadata.Statistics)
60}
61
62// ColumnChunkMetaData is a proxy around format.ColumnChunkMetaData
63// containing all of the information and metadata for a given column chunk
64// and it's associated Column
65type ColumnChunkMetaData struct {
66	column        *format.ColumnChunk
67	columnMeta    *format.ColumnMetaData
68	decryptedMeta format.ColumnMetaData
69	descr         *schema.Column
70	writerVersion *AppVersion
71	encodings     []parquet.Encoding
72	encodingStats []format.PageEncodingStats
73	possibleStats TypedStatistics
74	mem           memory.Allocator
75}
76
77// NewColumnChunkMetaData creates an instance of the metadata from a column chunk and descriptor
78//
79// this is primarily used internally or between the subpackages. ColumnChunkMetaDataBuilder should
80// be used by consumers instead of using this directly.
81func NewColumnChunkMetaData(column *format.ColumnChunk, descr *schema.Column, writerVersion *AppVersion, rowGroupOrdinal, columnOrdinal int16, fileDecryptor encryption.FileDecryptor) (*ColumnChunkMetaData, error) {
82	c := &ColumnChunkMetaData{
83		column:        column,
84		columnMeta:    column.GetMetaData(),
85		descr:         descr,
86		writerVersion: writerVersion,
87		mem:           memory.DefaultAllocator,
88	}
89	if column.IsSetCryptoMetadata() {
90		ccmd := column.CryptoMetadata
91
92		if ccmd.IsSetENCRYPTION_WITH_COLUMN_KEY() {
93			if fileDecryptor != nil && fileDecryptor.Properties() != nil {
94				// should decrypt metadata
95				path := parquet.ColumnPath(ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetPathInSchema())
96				keyMetadata := ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetKeyMetadata()
97				aadColumnMetadata := encryption.CreateModuleAad(fileDecryptor.FileAad(), encryption.ColumnMetaModule, rowGroupOrdinal, columnOrdinal, -1)
98				decryptor := fileDecryptor.GetColumnMetaDecryptor(path.String(), string(keyMetadata), aadColumnMetadata)
99				thrift.DeserializeThrift(&c.decryptedMeta, decryptor.Decrypt(column.GetEncryptedColumnMetadata()))
100				c.columnMeta = &c.decryptedMeta
101			} else {
102				return nil, xerrors.New("cannot decrypt column metadata. file decryption not setup correctly")
103			}
104		}
105	}
106	for _, enc := range c.columnMeta.Encodings {
107		c.encodings = append(c.encodings, parquet.Encoding(enc))
108	}
109	for _, enc := range c.columnMeta.EncodingStats {
110		c.encodingStats = append(c.encodingStats, *enc)
111	}
112	return c, nil
113}
114
115// CryptoMetadata returns the cryptographic metadata for how this column was
116// encrypted and how to decrypt it.
117func (c *ColumnChunkMetaData) CryptoMetadata() *format.ColumnCryptoMetaData {
118	return c.column.GetCryptoMetadata()
119}
120
121// FileOffset is the location in the file where the column data begins
122func (c *ColumnChunkMetaData) FileOffset() int64 { return c.column.FileOffset }
123
124// FilePath gives the name of the parquet file if provided in the metadata
125func (c *ColumnChunkMetaData) FilePath() string { return c.column.GetFilePath() }
126
127// Type is the physical storage type used in the parquet file for this column chunk.
128func (c *ColumnChunkMetaData) Type() parquet.Type { return parquet.Type(c.columnMeta.Type) }
129
130// NumValues is the number of values stored in just this chunk including nulls.
131func (c *ColumnChunkMetaData) NumValues() int64 { return c.columnMeta.NumValues }
132
133// PathInSchema is the full path to this column from the root of the schema including
134// any nested columns
135func (c *ColumnChunkMetaData) PathInSchema() parquet.ColumnPath {
136	return c.columnMeta.GetPathInSchema()
137}
138
139// Compression provides the type of compression used for this particular chunk.
140func (c *ColumnChunkMetaData) Compression() compress.Compression {
141	return compress.Compression(c.columnMeta.Codec)
142}
143
144// Encodings returns the list of different encodings used in this chunk
145func (c *ColumnChunkMetaData) Encodings() []parquet.Encoding { return c.encodings }
146
147// EncodingStats connects the order of encodings based on the list of pages and types
148func (c *ColumnChunkMetaData) EncodingStats() []PageEncodingStats {
149	ret := make([]PageEncodingStats, len(c.encodingStats))
150	for idx := range ret {
151		ret[idx].Encoding = parquet.Encoding(c.encodingStats[idx].Encoding)
152		ret[idx].PageType = c.encodingStats[idx].PageType
153	}
154	return ret
155}
156
157// HasDictionaryPage returns true if there is a dictionary page offset set in
158// this metadata.
159func (c *ColumnChunkMetaData) HasDictionaryPage() bool {
160	return c.columnMeta.IsSetDictionaryPageOffset()
161}
162
163// DictionaryPageOffset returns the location in the file where the dictionary page starts
164func (c *ColumnChunkMetaData) DictionaryPageOffset() int64 {
165	return c.columnMeta.GetDictionaryPageOffset()
166}
167
168// DataPageOffset returns the location in the file where the data pages begin for this column
169func (c *ColumnChunkMetaData) DataPageOffset() int64 { return c.columnMeta.GetDataPageOffset() }
170
171// HasIndexPage returns true if the offset for the index page is set in the metadata
172func (c *ColumnChunkMetaData) HasIndexPage() bool { return c.columnMeta.IsSetIndexPageOffset() }
173
174// IndexPageOffset is the location in the file where the index page starts.
175func (c *ColumnChunkMetaData) IndexPageOffset() int64 { return c.columnMeta.GetIndexPageOffset() }
176
177// TotalCompressedSize will be equal to TotalUncompressedSize if the data is not compressed.
178// Otherwise this will be the size of the actual data in the file.
179func (c *ColumnChunkMetaData) TotalCompressedSize() int64 {
180	return c.columnMeta.GetTotalCompressedSize()
181}
182
183// TotalUncompressedSize is the total size of the raw data after uncompressing the chunk
184func (c *ColumnChunkMetaData) TotalUncompressedSize() int64 {
185	return c.columnMeta.GetTotalUncompressedSize()
186}
187
188// BloomFilterOffset is the byte offset from the beginning of the file to the bloom
189// filter data.
190func (c *ColumnChunkMetaData) BloomFilterOffset() int64 {
191	return c.columnMeta.GetBloomFilterOffset()
192}
193
194// StatsSet returns true only if there are statistics set in the metadata and the column
195// descriptor has a sort order that is not SortUnknown
196//
197// It also checks the writer version to ensure that it was not written by a version
198// of parquet which is known to have incorrect stat computations.
199func (c *ColumnChunkMetaData) StatsSet() (bool, error) {
200	if !c.columnMeta.IsSetStatistics() || c.descr.SortOrder() == schema.SortUNKNOWN {
201		return false, nil
202	}
203
204	if c.possibleStats == nil {
205		c.possibleStats = makeColumnStats(c.columnMeta, c.descr, c.mem)
206	}
207
208	encoded, err := c.possibleStats.Encode()
209	if err != nil {
210		return false, err
211	}
212
213	return c.writerVersion.HasCorrectStatistics(c.Type(), c.descr.LogicalType(), encoded, c.descr.SortOrder()), nil
214}
215
216func (c *ColumnChunkMetaData) Equals(other *ColumnChunkMetaData) bool {
217	return reflect.DeepEqual(c.columnMeta, other.columnMeta)
218}
219
220// Statistics can return nil if there are no stats in this metadata
221func (c *ColumnChunkMetaData) Statistics() (TypedStatistics, error) {
222	ok, err := c.StatsSet()
223	if err != nil {
224		return nil, err
225	}
226
227	if ok {
228		return c.possibleStats, nil
229	}
230	return nil, nil
231}
232
233// ColumnChunkMetaDataBuilder is used during writing to construct metadata
234// for a given column chunk while writing, providing a proxy around constructing
235// the actual thrift object.
236type ColumnChunkMetaDataBuilder struct {
237	chunk  *format.ColumnChunk
238	props  *parquet.WriterProperties
239	column *schema.Column
240
241	compressedSize int64
242}
243
244func NewColumnChunkMetaDataBuilder(props *parquet.WriterProperties, column *schema.Column) *ColumnChunkMetaDataBuilder {
245	return NewColumnChunkMetaDataBuilderWithContents(props, column, format.NewColumnChunk())
246}
247
248// NewColumnChunkMetaDataBuilderWithContents will construct a builder and start it with the provided
249// column chunk information rather than with an empty column chunk.
250func NewColumnChunkMetaDataBuilderWithContents(props *parquet.WriterProperties, column *schema.Column, chunk *format.ColumnChunk) *ColumnChunkMetaDataBuilder {
251	b := &ColumnChunkMetaDataBuilder{
252		props:  props,
253		column: column,
254		chunk:  chunk,
255	}
256	b.init(chunk)
257	return b
258}
259
260// Contents returns the underlying thrift ColumnChunk object so that it can be used
261// for constructing or duplicating column metadata
262func (c *ColumnChunkMetaDataBuilder) Contents() *format.ColumnChunk { return c.chunk }
263
264func (c *ColumnChunkMetaDataBuilder) init(chunk *format.ColumnChunk) {
265	c.chunk = chunk
266	if !c.chunk.IsSetMetaData() {
267		c.chunk.MetaData = format.NewColumnMetaData()
268	}
269	c.chunk.MetaData.Type = format.Type(c.column.PhysicalType())
270	c.chunk.MetaData.PathInSchema = schema.ColumnPathFromNode(c.column.SchemaNode())
271	c.chunk.MetaData.Codec = format.CompressionCodec(c.props.CompressionFor(c.column.Path()))
272}
273
274func (c *ColumnChunkMetaDataBuilder) SetFilePath(val string) {
275	c.chunk.FilePath = &val
276}
277
278// Descr returns the associated column descriptor for this column chunk
279func (c *ColumnChunkMetaDataBuilder) Descr() *schema.Column { return c.column }
280
281func (c *ColumnChunkMetaDataBuilder) TotalCompressedSize() int64 {
282	// if this column is encrypted, after Finish is called, the MetaData
283	// field is set to nil and we store the compressed size so return that
284	if c.chunk.MetaData == nil {
285		return c.compressedSize
286	}
287	return c.chunk.MetaData.GetTotalCompressedSize()
288}
289
290func (c *ColumnChunkMetaDataBuilder) SetStats(val EncodedStatistics) {
291	c.chunk.MetaData.Statistics = val.ToThrift()
292}
293
294// ChunkMetaInfo is a helper struct for passing the offset and size information
295// for finishing the building of column chunk metadata
296type ChunkMetaInfo struct {
297	NumValues        int64
298	DictPageOffset   int64
299	IndexPageOffset  int64
300	DataPageOffset   int64
301	CompressedSize   int64
302	UncompressedSize int64
303}
304
305// EncodingStats is a helper struct for passing the encoding stat information
306// for finishing up metadata for a column chunk.
307type EncodingStats struct {
308	DictEncodingStats map[parquet.Encoding]int32
309	DataEncodingStats map[parquet.Encoding]int32
310}
311
312// Finish finalizes the metadata with the given offsets,
313// flushes any compression that needs to be done, and performs
314// any encryption if an encryptor is provided.
315func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict, dictFallback bool, encStats EncodingStats, metaEncryptor encryption.Encryptor) error {
316	if info.DictPageOffset > 0 {
317		c.chunk.MetaData.DictionaryPageOffset = &info.DictPageOffset
318		c.chunk.FileOffset = info.DictPageOffset + info.CompressedSize
319	} else {
320		c.chunk.FileOffset = info.DataPageOffset + info.CompressedSize
321	}
322
323	c.chunk.MetaData.NumValues = info.NumValues
324	if info.IndexPageOffset >= 0 {
325		c.chunk.MetaData.IndexPageOffset = &info.IndexPageOffset
326	}
327
328	c.chunk.MetaData.DataPageOffset = info.DataPageOffset
329	c.chunk.MetaData.TotalUncompressedSize = info.UncompressedSize
330	c.chunk.MetaData.TotalCompressedSize = info.CompressedSize
331
332	// no matter the configuration, the maximum number of thrift encodings we'll
333	// populate is going to be 3:
334	// 	1. potential dictionary index encoding
335	//	2. page encoding
336	//	3. RLE for repetition and definition levels
337	// so let's preallocate a capacity of 3 but initialize the slice at 0 len
338	const maxEncodings = 3
339
340	thriftEncodings := make([]format.Encoding, 0, maxEncodings)
341	if hasDict {
342		thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryIndexEncoding()))
343		if c.props.Version() == parquet.V1_0 {
344			thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN)
345		} else {
346			thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryPageEncoding()))
347		}
348	} else { // no dictionary
349		thriftEncodings = append(thriftEncodings, format.Encoding(c.props.EncodingFor(c.column.Path())))
350	}
351
352	thriftEncodings = append(thriftEncodings, format.Encoding(parquet.Encodings.RLE))
353	// Only PLAIN encoding is supported for fallback in V1
354	// TODO(zeroshade): Use user specified encoding for V2
355	if dictFallback {
356		thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN)
357	}
358	c.chunk.MetaData.Encodings = thriftEncodings
359
360	thriftEncodingStats := make([]*format.PageEncodingStats, 0, len(encStats.DictEncodingStats)+len(encStats.DataEncodingStats))
361	for k, v := range encStats.DictEncodingStats {
362		thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{
363			PageType: format.PageType_DICTIONARY_PAGE,
364			Encoding: format.Encoding(k),
365			Count:    v,
366		})
367	}
368	for k, v := range encStats.DataEncodingStats {
369		thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{
370			PageType: format.PageType_DATA_PAGE,
371			Encoding: format.Encoding(k),
372			Count:    v,
373		})
374	}
375	c.chunk.MetaData.EncodingStats = thriftEncodingStats
376
377	encryptProps := c.props.ColumnEncryptionProperties(c.column.Path())
378	if encryptProps != nil && encryptProps.IsEncrypted() {
379		ccmd := format.NewColumnCryptoMetaData()
380		if encryptProps.IsEncryptedWithFooterKey() {
381			ccmd.ENCRYPTION_WITH_FOOTER_KEY = format.NewEncryptionWithFooterKey()
382		} else {
383			ccmd.ENCRYPTION_WITH_COLUMN_KEY = &format.EncryptionWithColumnKey{
384				KeyMetadata:  []byte(encryptProps.KeyMetadata()),
385				PathInSchema: c.column.ColumnPath(),
386			}
387		}
388		c.chunk.CryptoMetadata = ccmd
389
390		encryptedFooter := c.props.FileEncryptionProperties().EncryptedFooter()
391		encryptMetadata := !encryptedFooter || !encryptProps.IsEncryptedWithFooterKey()
392		if encryptMetadata {
393			// Serialize and encrypt ColumnMetadata separately
394			// Thrift-serialize the ColumnMetaData structure,
395			// encrypt it with the column key, and write to encrypted_column_metadata
396			serializer := thrift.NewThriftSerializer()
397			data, err := serializer.Write(context.Background(), c.chunk.MetaData)
398			if err != nil {
399				return err
400			}
401			var buf bytes.Buffer
402			metaEncryptor.Encrypt(&buf, data)
403			c.chunk.EncryptedColumnMetadata = buf.Bytes()
404
405			if encryptedFooter {
406				c.compressedSize = c.chunk.MetaData.GetTotalCompressedSize()
407				c.chunk.MetaData = nil
408			} else {
409				// Keep redacted metadata version for old readers
410				c.chunk.MetaData.Statistics = nil
411				c.chunk.MetaData.EncodingStats = nil
412			}
413		}
414	}
415	return nil
416}
417
418// WriteTo will always return 0 as the int64 since the thrift writer library
419// does not return the number of bytes written, we only use the signature
420// of (int64, error) in order to match the standard WriteTo interfaces.
421func (c *ColumnChunkMetaDataBuilder) WriteTo(w io.Writer) (int64, error) {
422	return 0, thrift.SerializeThriftStream(c.chunk, w)
423}
424