1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package metadata
18
19import (
20	"bytes"
21	"context"
22	"io"
23	"reflect"
24	"unicode/utf8"
25
26	"github.com/apache/arrow/go/v6/parquet"
27	"github.com/apache/arrow/go/v6/parquet/compress"
28	"github.com/apache/arrow/go/v6/parquet/internal/encryption"
29	format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
30	"github.com/apache/arrow/go/v6/parquet/internal/thrift"
31	"github.com/apache/arrow/go/v6/parquet/schema"
32	"golang.org/x/xerrors"
33)
34
35// DefaultCompressionType is used unless a different compression is specified
36// in the properties
37var DefaultCompressionType = compress.Codecs.Uncompressed
38
39// FileMetaDataBuilder is a proxy for more easily constructing file metadata
40// particularly used when writing a file out.
41type FileMetaDataBuilder struct {
42	metadata       *format.FileMetaData
43	props          *parquet.WriterProperties
44	schema         *schema.Schema
45	rowGroups      []*format.RowGroup
46	currentRgBldr  *RowGroupMetaDataBuilder
47	kvmeta         KeyValueMetadata
48	cryptoMetadata *format.FileCryptoMetaData
49}
50
51// NewFileMetadataBuilder will use the default writer properties if nil is passed for
52// the writer properties and nil is allowable for the key value metadata.
53func NewFileMetadataBuilder(schema *schema.Schema, props *parquet.WriterProperties, kvmeta KeyValueMetadata) *FileMetaDataBuilder {
54	var crypto *format.FileCryptoMetaData
55	if props.FileEncryptionProperties() != nil && props.FileEncryptionProperties().EncryptedFooter() {
56		crypto = format.NewFileCryptoMetaData()
57	}
58	return &FileMetaDataBuilder{
59		metadata:       format.NewFileMetaData(),
60		props:          props,
61		schema:         schema,
62		kvmeta:         kvmeta,
63		cryptoMetadata: crypto,
64	}
65}
66
67// GetFileCryptoMetaData returns the cryptographic information for encrypting/
68// decrypting the file.
69func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata {
70	if f.cryptoMetadata == nil {
71		return nil
72	}
73
74	props := f.props.FileEncryptionProperties()
75	f.cryptoMetadata.EncryptionAlgorithm = props.Algorithm().ToThrift()
76	keyMetadata := props.FooterKeyMetadata()
77	if keyMetadata != "" {
78		f.cryptoMetadata.KeyMetadata = []byte(keyMetadata)
79	}
80
81	return &FileCryptoMetadata{f.cryptoMetadata, 0}
82}
83
84// AppendRowGroup adds a rowgroup to the list and returns a builder
85// for that row group
86func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder {
87	if f.rowGroups == nil {
88		f.rowGroups = make([]*format.RowGroup, 0, 1)
89	}
90
91	rg := format.NewRowGroup()
92	f.rowGroups = append(f.rowGroups, rg)
93	f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg)
94	return f.currentRgBldr
95}
96
97// Finish will finalize the metadata of the number of rows, row groups,
98// version etc. This will clear out this filemetadatabuilder so it can
99// be re-used
100func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) {
101	totalRows := int64(0)
102	for _, rg := range f.rowGroups {
103		totalRows += rg.NumRows
104	}
105	f.metadata.NumRows = totalRows
106	f.metadata.RowGroups = f.rowGroups
107	switch f.props.Version() {
108	case parquet.V1_0:
109		f.metadata.Version = 1
110	default:
111		f.metadata.Version = 2
112	}
113	createdBy := f.props.CreatedBy()
114	f.metadata.CreatedBy = &createdBy
115
116	// Users cannot set the `ColumnOrder` since we do not not have user defined sort order
117	// in the spec yet.
118	//
119	// We always default to `TYPE_DEFINED_ORDER`. We can expose it in
120	// the API once we have user defined sort orders in the Parquet format.
121	// TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType
122	typeDefined := format.NewTypeDefinedOrder()
123	colOrder := &format.ColumnOrder{TYPE_ORDER: typeDefined}
124	f.metadata.ColumnOrders = make([]*format.ColumnOrder, f.schema.NumColumns())
125	for idx := range f.metadata.ColumnOrders {
126		f.metadata.ColumnOrders[idx] = colOrder
127	}
128
129	encryptProps := f.props.FileEncryptionProperties()
130	if encryptProps != nil && !encryptProps.EncryptedFooter() {
131		var signingAlgo parquet.Algorithm
132		algo := encryptProps.Algorithm()
133		signingAlgo.Aad.AadFileUnique = algo.Aad.AadFileUnique
134		signingAlgo.Aad.SupplyAadPrefix = algo.Aad.SupplyAadPrefix
135		if !algo.Aad.SupplyAadPrefix {
136			signingAlgo.Aad.AadPrefix = algo.Aad.AadPrefix
137		}
138		signingAlgo.Algo = parquet.AesGcm
139		f.metadata.EncryptionAlgorithm = signingAlgo.ToThrift()
140		footerSigningMetadata := f.props.FileEncryptionProperties().FooterKeyMetadata()
141		if footerSigningMetadata != "" {
142			f.metadata.FooterSigningKeyMetadata = []byte(footerSigningMetadata)
143		}
144	}
145
146	f.metadata.Schema = schema.ToThrift(f.schema.Root())
147	f.metadata.KeyValueMetadata = f.kvmeta
148
149	out := &FileMetaData{
150		FileMetaData: f.metadata,
151		version:      NewAppVersion(f.metadata.GetCreatedBy()),
152	}
153	if err := out.initSchema(); err != nil {
154		return nil, err
155	}
156	out.initColumnOrders()
157
158	f.metadata = format.NewFileMetaData()
159	f.rowGroups = nil
160	return out, nil
161}
162
163// KeyValueMetadata is an alias for a slice of thrift keyvalue pairs.
164//
165// It is presumed that the metadata should all be utf8 valid.
166type KeyValueMetadata []*format.KeyValue
167
168// NewKeyValueMetadata is equivalent to make(KeyValueMetadata, 0)
169func NewKeyValueMetadata() KeyValueMetadata {
170	return make(KeyValueMetadata, 0)
171}
172
173// Append adds the passed in key and value to the metadata, if either contains
174// any invalid utf8 runes, then it is not added and an error is returned.
175func (k *KeyValueMetadata) Append(key, value string) error {
176	if !utf8.ValidString(key) || !utf8.ValidString(value) {
177		return xerrors.Errorf("metadata must be valid utf8 strings, got key = '%s' and value = '%s'", key, value)
178	}
179	*k = append(*k, &format.KeyValue{Key: key, Value: &value})
180	return nil
181}
182
183func (k KeyValueMetadata) Len() int { return len(k) }
184
185// Equals compares all of the metadata keys and values to check they are equal
186func (k KeyValueMetadata) Equals(other KeyValueMetadata) bool {
187	return reflect.DeepEqual(k, other)
188}
189
190func (k KeyValueMetadata) Keys() (ret []string) {
191	ret = make([]string, len(k))
192	for idx, v := range k {
193		ret[idx] = v.GetKey()
194	}
195	return
196}
197
198func (k KeyValueMetadata) Values() (ret []string) {
199	ret = make([]string, len(k))
200	for idx, v := range k {
201		ret[idx] = v.GetValue()
202	}
203	return
204}
205
206func (k KeyValueMetadata) FindValue(key string) *string {
207	for _, v := range k {
208		if v.Key == key {
209			return v.Value
210		}
211	}
212	return nil
213}
214
215// FileMetaData is a proxy around the underlying thrift FileMetaData object
216// to make it easier to use and interact with.
217type FileMetaData struct {
218	*format.FileMetaData
219	Schema        *schema.Schema
220	FileDecryptor encryption.FileDecryptor
221
222	// app version of the writer for this file
223	version *AppVersion
224	// size of the raw bytes of the metadata in the file which were
225	// decoded by thrift, Size() getter returns the value.
226	metadataLen int
227}
228
229// NewFileMetaData takes in the raw bytes of the serialized metadata to deserialize
230// and will attempt to decrypt the footer if a decryptor is provided.
231func NewFileMetaData(data []byte, fileDecryptor encryption.FileDecryptor) (*FileMetaData, error) {
232	meta := format.NewFileMetaData()
233	if fileDecryptor != nil {
234		footerDecryptor := fileDecryptor.GetFooterDecryptor()
235		data = footerDecryptor.Decrypt(data)
236	}
237
238	remain, err := thrift.DeserializeThrift(meta, data)
239	if err != nil {
240		return nil, err
241	}
242
243	f := &FileMetaData{
244		FileMetaData:  meta,
245		version:       NewAppVersion(meta.GetCreatedBy()),
246		metadataLen:   len(data) - int(remain),
247		FileDecryptor: fileDecryptor,
248	}
249
250	f.initSchema()
251	f.initColumnOrders()
252
253	return f, nil
254}
255
256// Size is the length of the raw serialized metadata bytes in the footer
257func (f *FileMetaData) Size() int { return f.metadataLen }
258
259// NumSchemaElements is the length of the flattened schema list in the thrift
260func (f *FileMetaData) NumSchemaElements() int {
261	return len(f.FileMetaData.Schema)
262}
263
264// RowGroup provides the metadata for the (0-based) index of the row group
265func (f *FileMetaData) RowGroup(i int) *RowGroupMetaData {
266	return &RowGroupMetaData{
267		f.RowGroups[i], f.Schema, f.version, f.FileDecryptor,
268	}
269}
270
271func (f *FileMetaData) Serialize(ctx context.Context) ([]byte, error) {
272	return thrift.NewThriftSerializer().Write(ctx, f.FileMetaData)
273}
274
275func (f *FileMetaData) SerializeString(ctx context.Context) (string, error) {
276	return thrift.NewThriftSerializer().WriteString(ctx, f.FileMetaData)
277}
278
279// EncryptionAlgorithm constructs the algorithm object from the thrift
280// information or returns an empty instance if it was not set.
281func (f *FileMetaData) EncryptionAlgorithm() parquet.Algorithm {
282	if f.IsSetEncryptionAlgorithm() {
283		return parquet.AlgorithmFromThrift(f.GetEncryptionAlgorithm())
284	}
285	return parquet.Algorithm{}
286}
287
288func (f *FileMetaData) initSchema() error {
289	root, err := schema.FromParquet(f.FileMetaData.Schema)
290	if err != nil {
291		return err
292	}
293	f.Schema = schema.NewSchema(root.(*schema.GroupNode))
294	return nil
295}
296
297func (f *FileMetaData) initColumnOrders() {
298	orders := make([]parquet.ColumnOrder, 0, f.Schema.NumColumns())
299	if f.IsSetColumnOrders() {
300		for _, o := range f.GetColumnOrders() {
301			if o.IsSetTYPE_ORDER() {
302				orders = append(orders, parquet.ColumnOrders.TypeDefinedOrder)
303			} else {
304				orders = append(orders, parquet.ColumnOrders.Undefined)
305			}
306		}
307	} else {
308		orders = orders[:f.Schema.NumColumns()]
309		orders[0] = parquet.ColumnOrders.Undefined
310		for i := 1; i < len(orders); i *= 2 {
311			copy(orders[i:], orders[:i])
312		}
313	}
314	f.Schema.UpdateColumnOrders(orders)
315}
316
317// WriterVersion returns the constructed application version from the
318// created by string
319func (f *FileMetaData) WriterVersion() *AppVersion {
320	if f.version == nil {
321		f.version = NewAppVersion(f.GetCreatedBy())
322	}
323	return f.version
324}
325
326// SetFilePath will set the file path into all of the columns in each row group.
327func (f *FileMetaData) SetFilePath(path string) {
328	for _, rg := range f.RowGroups {
329		for _, chunk := range rg.Columns {
330			chunk.FilePath = &path
331		}
332	}
333}
334
335// AppendRowGroups will add all of the rowgroup metadata from other to the
336// current file metadata
337func (f *FileMetaData) AppendRowGroups(other *FileMetaData) error {
338	if !f.Schema.Equals(other.Schema) {
339		return xerrors.New("parquet/FileMetaData: AppendRowGroups requires equal schemas")
340	}
341
342	f.RowGroups = append(f.RowGroups, other.GetRowGroups()...)
343	for _, rg := range other.GetRowGroups() {
344		f.NumRows += rg.NumRows
345	}
346	return nil
347}
348
349// Subset will construct a new FileMetaData object containing only the requested
350// row groups by index
351func (f *FileMetaData) Subset(rowGroups []int) (*FileMetaData, error) {
352	for _, i := range rowGroups {
353		if i < len(f.RowGroups) {
354			continue
355		}
356		return nil, xerrors.Errorf("parquet: this file only has %d row groups, but requested a subset including row group: %d", len(f.RowGroups), i)
357	}
358
359	out := &FileMetaData{
360		&format.FileMetaData{
361			Schema:                   f.FileMetaData.Schema,
362			CreatedBy:                f.CreatedBy,
363			ColumnOrders:             f.GetColumnOrders(),
364			EncryptionAlgorithm:      f.FileMetaData.EncryptionAlgorithm,
365			FooterSigningKeyMetadata: f.FooterSigningKeyMetadata,
366			Version:                  f.FileMetaData.Version,
367			KeyValueMetadata:         f.KeyValueMetadata(),
368		},
369		f.Schema,
370		f.FileDecryptor,
371		f.version,
372		0,
373	}
374
375	out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups))
376	for _, selected := range rowGroups {
377		out.RowGroups = append(out.RowGroups, f.RowGroups[selected])
378		out.NumRows += f.RowGroups[selected].GetNumRows()
379	}
380
381	return out, nil
382}
383
384func (f *FileMetaData) Equals(other *FileMetaData) bool {
385	return reflect.DeepEqual(f.FileMetaData, other.FileMetaData)
386}
387
388func (f *FileMetaData) KeyValueMetadata() KeyValueMetadata {
389	return f.GetKeyValueMetadata()
390}
391
392// VerifySignature constructs a cryptographic signature using the FileDecryptor
393// of the footer and then verifies it's integrity.
394//
395// Panics if f.FileDecryptor is nil
396func (f *FileMetaData) VerifySignature(signature []byte) bool {
397	if f.FileDecryptor == nil {
398		panic("decryption not set propertly, cannot verify signature")
399	}
400
401	serializer := thrift.NewThriftSerializer()
402	data, _ := serializer.Write(context.Background(), f.FileMetaData)
403	nonce := signature[:encryption.NonceLength]
404	tag := signature[encryption.NonceLength : encryption.NonceLength+encryption.GcmTagLength]
405
406	key := f.FileDecryptor.GetFooterKey()
407	aad := encryption.CreateFooterAad(f.FileDecryptor.FileAad())
408
409	enc := encryption.NewAesEncryptor(f.FileDecryptor.Algorithm(), true)
410	var buf bytes.Buffer
411	buf.Grow(enc.CiphertextSizeDelta() + len(data))
412	encryptedLen := enc.SignedFooterEncrypt(&buf, data, []byte(key), []byte(aad), nonce)
413	return bytes.Equal(buf.Bytes()[encryptedLen-encryption.GcmTagLength:], tag)
414}
415
416// WriteTo will serialize and write out this file metadata, encrypting it if
417// appropriate.
418//
419// If it is an encrypted file with a plaintext footer, then we will write the
420// signature with the unencrypted footer.
421func (f *FileMetaData) WriteTo(w io.Writer, encryptor encryption.Encryptor) (int64, error) {
422	serializer := thrift.NewThriftSerializer()
423	// only in encrypted files with plaintext footers, the encryption algorithm is set in the footer
424	if f.IsSetEncryptionAlgorithm() {
425		data, err := serializer.Write(context.Background(), f.FileMetaData)
426		if err != nil {
427			return 0, err
428		}
429
430		// encrypt the footer key
431		var buf bytes.Buffer
432		buf.Grow(encryptor.CiphertextSizeDelta() + len(data))
433		encryptedLen := encryptor.Encrypt(&buf, data)
434
435		wrote := 0
436		n := 0
437		// write unencrypted footer
438		if n, err = w.Write(data); err != nil {
439			return int64(n), err
440		}
441		wrote += n
442		// write signature (nonce and tag)
443		buf.Next(4)
444		if n, err = w.Write(buf.Next(encryption.NonceLength)); err != nil {
445			return int64(wrote + n), err
446		}
447		wrote += n
448		buf.Next(encryptedLen - 4 - encryption.NonceLength - encryption.GcmTagLength)
449		n, err = w.Write(buf.Next(encryption.GcmTagLength))
450		return int64(wrote + n), err
451	}
452	n, err := serializer.Serialize(f.FileMetaData, w, encryptor)
453	return int64(n), err
454}
455
456// Version returns the "version" of the file
457//
458// WARNING: The value returned by this method is unreliable as 1) the
459// parquet file metadata stores the version as a single integer and
460// 2) some producers are known to always write a hardcoded value. Therefore
461// you cannot use this value to know which features are used in the file.
462func (f *FileMetaData) Version() parquet.Version {
463	switch f.FileMetaData.Version {
464	case 1:
465		return parquet.V1_0
466	case 2:
467		return parquet.V2_LATEST
468	default:
469		// imporperly set version, assume parquet 1.0
470		return parquet.V1_0
471	}
472}
473
474// FileCryptoMetadata is a proxy for the thrift fileCryptoMetadata object
475type FileCryptoMetadata struct {
476	metadata          *format.FileCryptoMetaData
477	cryptoMetadataLen uint32
478}
479
480// NewFileCryptoMetaData takes in the raw serialized bytes to deserialize
481// storing the number of bytes that were actually deserialized.
482func NewFileCryptoMetaData(metadata []byte) (ret FileCryptoMetadata, err error) {
483	ret.metadata = format.NewFileCryptoMetaData()
484	var remain uint64
485	remain, err = thrift.DeserializeThrift(ret.metadata, metadata)
486	ret.cryptoMetadataLen = uint32(uint64(len(metadata)) - remain)
487	return
488}
489
490// WriteTo writes out the serialized crypto metadata to w
491func (fc FileCryptoMetadata) WriteTo(w io.Writer) (int64, error) {
492	serializer := thrift.NewThriftSerializer()
493	n, err := serializer.Serialize(fc.metadata, w, nil)
494	return int64(n), err
495}
496
497// Len is the number of bytes that were deserialized to create this object
498func (fc FileCryptoMetadata) Len() int { return int(fc.cryptoMetadataLen) }
499
500func (fc FileCryptoMetadata) KeyMetadata() []byte {
501	return fc.metadata.KeyMetadata
502}
503
504// EncryptionAlgorithm constructs the object from the thrift instance of
505// the encryption algorithm
506func (fc FileCryptoMetadata) EncryptionAlgorithm() parquet.Algorithm {
507	return parquet.AlgorithmFromThrift(fc.metadata.GetEncryptionAlgorithm())
508}
509