1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package metadata 18 19import ( 20 "bytes" 21 "context" 22 "io" 23 "reflect" 24 "unicode/utf8" 25 26 "github.com/apache/arrow/go/v6/parquet" 27 "github.com/apache/arrow/go/v6/parquet/compress" 28 "github.com/apache/arrow/go/v6/parquet/internal/encryption" 29 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" 30 "github.com/apache/arrow/go/v6/parquet/internal/thrift" 31 "github.com/apache/arrow/go/v6/parquet/schema" 32 "golang.org/x/xerrors" 33) 34 35// DefaultCompressionType is used unless a different compression is specified 36// in the properties 37var DefaultCompressionType = compress.Codecs.Uncompressed 38 39// FileMetaDataBuilder is a proxy for more easily constructing file metadata 40// particularly used when writing a file out. 41type FileMetaDataBuilder struct { 42 metadata *format.FileMetaData 43 props *parquet.WriterProperties 44 schema *schema.Schema 45 rowGroups []*format.RowGroup 46 currentRgBldr *RowGroupMetaDataBuilder 47 kvmeta KeyValueMetadata 48 cryptoMetadata *format.FileCryptoMetaData 49} 50 51// NewFileMetadataBuilder will use the default writer properties if nil is passed for 52// the writer properties and nil is allowable for the key value metadata. 53func NewFileMetadataBuilder(schema *schema.Schema, props *parquet.WriterProperties, kvmeta KeyValueMetadata) *FileMetaDataBuilder { 54 var crypto *format.FileCryptoMetaData 55 if props.FileEncryptionProperties() != nil && props.FileEncryptionProperties().EncryptedFooter() { 56 crypto = format.NewFileCryptoMetaData() 57 } 58 return &FileMetaDataBuilder{ 59 metadata: format.NewFileMetaData(), 60 props: props, 61 schema: schema, 62 kvmeta: kvmeta, 63 cryptoMetadata: crypto, 64 } 65} 66 67// GetFileCryptoMetaData returns the cryptographic information for encrypting/ 68// decrypting the file. 69func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata { 70 if f.cryptoMetadata == nil { 71 return nil 72 } 73 74 props := f.props.FileEncryptionProperties() 75 f.cryptoMetadata.EncryptionAlgorithm = props.Algorithm().ToThrift() 76 keyMetadata := props.FooterKeyMetadata() 77 if keyMetadata != "" { 78 f.cryptoMetadata.KeyMetadata = []byte(keyMetadata) 79 } 80 81 return &FileCryptoMetadata{f.cryptoMetadata, 0} 82} 83 84// AppendRowGroup adds a rowgroup to the list and returns a builder 85// for that row group 86func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder { 87 if f.rowGroups == nil { 88 f.rowGroups = make([]*format.RowGroup, 0, 1) 89 } 90 91 rg := format.NewRowGroup() 92 f.rowGroups = append(f.rowGroups, rg) 93 f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg) 94 return f.currentRgBldr 95} 96 97// Finish will finalize the metadata of the number of rows, row groups, 98// version etc. This will clear out this filemetadatabuilder so it can 99// be re-used 100func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) { 101 totalRows := int64(0) 102 for _, rg := range f.rowGroups { 103 totalRows += rg.NumRows 104 } 105 f.metadata.NumRows = totalRows 106 f.metadata.RowGroups = f.rowGroups 107 switch f.props.Version() { 108 case parquet.V1_0: 109 f.metadata.Version = 1 110 default: 111 f.metadata.Version = 2 112 } 113 createdBy := f.props.CreatedBy() 114 f.metadata.CreatedBy = &createdBy 115 116 // Users cannot set the `ColumnOrder` since we do not not have user defined sort order 117 // in the spec yet. 118 // 119 // We always default to `TYPE_DEFINED_ORDER`. We can expose it in 120 // the API once we have user defined sort orders in the Parquet format. 121 // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType 122 typeDefined := format.NewTypeDefinedOrder() 123 colOrder := &format.ColumnOrder{TYPE_ORDER: typeDefined} 124 f.metadata.ColumnOrders = make([]*format.ColumnOrder, f.schema.NumColumns()) 125 for idx := range f.metadata.ColumnOrders { 126 f.metadata.ColumnOrders[idx] = colOrder 127 } 128 129 encryptProps := f.props.FileEncryptionProperties() 130 if encryptProps != nil && !encryptProps.EncryptedFooter() { 131 var signingAlgo parquet.Algorithm 132 algo := encryptProps.Algorithm() 133 signingAlgo.Aad.AadFileUnique = algo.Aad.AadFileUnique 134 signingAlgo.Aad.SupplyAadPrefix = algo.Aad.SupplyAadPrefix 135 if !algo.Aad.SupplyAadPrefix { 136 signingAlgo.Aad.AadPrefix = algo.Aad.AadPrefix 137 } 138 signingAlgo.Algo = parquet.AesGcm 139 f.metadata.EncryptionAlgorithm = signingAlgo.ToThrift() 140 footerSigningMetadata := f.props.FileEncryptionProperties().FooterKeyMetadata() 141 if footerSigningMetadata != "" { 142 f.metadata.FooterSigningKeyMetadata = []byte(footerSigningMetadata) 143 } 144 } 145 146 f.metadata.Schema = schema.ToThrift(f.schema.Root()) 147 f.metadata.KeyValueMetadata = f.kvmeta 148 149 out := &FileMetaData{ 150 FileMetaData: f.metadata, 151 version: NewAppVersion(f.metadata.GetCreatedBy()), 152 } 153 if err := out.initSchema(); err != nil { 154 return nil, err 155 } 156 out.initColumnOrders() 157 158 f.metadata = format.NewFileMetaData() 159 f.rowGroups = nil 160 return out, nil 161} 162 163// KeyValueMetadata is an alias for a slice of thrift keyvalue pairs. 164// 165// It is presumed that the metadata should all be utf8 valid. 166type KeyValueMetadata []*format.KeyValue 167 168// NewKeyValueMetadata is equivalent to make(KeyValueMetadata, 0) 169func NewKeyValueMetadata() KeyValueMetadata { 170 return make(KeyValueMetadata, 0) 171} 172 173// Append adds the passed in key and value to the metadata, if either contains 174// any invalid utf8 runes, then it is not added and an error is returned. 175func (k *KeyValueMetadata) Append(key, value string) error { 176 if !utf8.ValidString(key) || !utf8.ValidString(value) { 177 return xerrors.Errorf("metadata must be valid utf8 strings, got key = '%s' and value = '%s'", key, value) 178 } 179 *k = append(*k, &format.KeyValue{Key: key, Value: &value}) 180 return nil 181} 182 183func (k KeyValueMetadata) Len() int { return len(k) } 184 185// Equals compares all of the metadata keys and values to check they are equal 186func (k KeyValueMetadata) Equals(other KeyValueMetadata) bool { 187 return reflect.DeepEqual(k, other) 188} 189 190func (k KeyValueMetadata) Keys() (ret []string) { 191 ret = make([]string, len(k)) 192 for idx, v := range k { 193 ret[idx] = v.GetKey() 194 } 195 return 196} 197 198func (k KeyValueMetadata) Values() (ret []string) { 199 ret = make([]string, len(k)) 200 for idx, v := range k { 201 ret[idx] = v.GetValue() 202 } 203 return 204} 205 206func (k KeyValueMetadata) FindValue(key string) *string { 207 for _, v := range k { 208 if v.Key == key { 209 return v.Value 210 } 211 } 212 return nil 213} 214 215// FileMetaData is a proxy around the underlying thrift FileMetaData object 216// to make it easier to use and interact with. 217type FileMetaData struct { 218 *format.FileMetaData 219 Schema *schema.Schema 220 FileDecryptor encryption.FileDecryptor 221 222 // app version of the writer for this file 223 version *AppVersion 224 // size of the raw bytes of the metadata in the file which were 225 // decoded by thrift, Size() getter returns the value. 226 metadataLen int 227} 228 229// NewFileMetaData takes in the raw bytes of the serialized metadata to deserialize 230// and will attempt to decrypt the footer if a decryptor is provided. 231func NewFileMetaData(data []byte, fileDecryptor encryption.FileDecryptor) (*FileMetaData, error) { 232 meta := format.NewFileMetaData() 233 if fileDecryptor != nil { 234 footerDecryptor := fileDecryptor.GetFooterDecryptor() 235 data = footerDecryptor.Decrypt(data) 236 } 237 238 remain, err := thrift.DeserializeThrift(meta, data) 239 if err != nil { 240 return nil, err 241 } 242 243 f := &FileMetaData{ 244 FileMetaData: meta, 245 version: NewAppVersion(meta.GetCreatedBy()), 246 metadataLen: len(data) - int(remain), 247 FileDecryptor: fileDecryptor, 248 } 249 250 f.initSchema() 251 f.initColumnOrders() 252 253 return f, nil 254} 255 256// Size is the length of the raw serialized metadata bytes in the footer 257func (f *FileMetaData) Size() int { return f.metadataLen } 258 259// NumSchemaElements is the length of the flattened schema list in the thrift 260func (f *FileMetaData) NumSchemaElements() int { 261 return len(f.FileMetaData.Schema) 262} 263 264// RowGroup provides the metadata for the (0-based) index of the row group 265func (f *FileMetaData) RowGroup(i int) *RowGroupMetaData { 266 return &RowGroupMetaData{ 267 f.RowGroups[i], f.Schema, f.version, f.FileDecryptor, 268 } 269} 270 271func (f *FileMetaData) Serialize(ctx context.Context) ([]byte, error) { 272 return thrift.NewThriftSerializer().Write(ctx, f.FileMetaData) 273} 274 275func (f *FileMetaData) SerializeString(ctx context.Context) (string, error) { 276 return thrift.NewThriftSerializer().WriteString(ctx, f.FileMetaData) 277} 278 279// EncryptionAlgorithm constructs the algorithm object from the thrift 280// information or returns an empty instance if it was not set. 281func (f *FileMetaData) EncryptionAlgorithm() parquet.Algorithm { 282 if f.IsSetEncryptionAlgorithm() { 283 return parquet.AlgorithmFromThrift(f.GetEncryptionAlgorithm()) 284 } 285 return parquet.Algorithm{} 286} 287 288func (f *FileMetaData) initSchema() error { 289 root, err := schema.FromParquet(f.FileMetaData.Schema) 290 if err != nil { 291 return err 292 } 293 f.Schema = schema.NewSchema(root.(*schema.GroupNode)) 294 return nil 295} 296 297func (f *FileMetaData) initColumnOrders() { 298 orders := make([]parquet.ColumnOrder, 0, f.Schema.NumColumns()) 299 if f.IsSetColumnOrders() { 300 for _, o := range f.GetColumnOrders() { 301 if o.IsSetTYPE_ORDER() { 302 orders = append(orders, parquet.ColumnOrders.TypeDefinedOrder) 303 } else { 304 orders = append(orders, parquet.ColumnOrders.Undefined) 305 } 306 } 307 } else { 308 orders = orders[:f.Schema.NumColumns()] 309 orders[0] = parquet.ColumnOrders.Undefined 310 for i := 1; i < len(orders); i *= 2 { 311 copy(orders[i:], orders[:i]) 312 } 313 } 314 f.Schema.UpdateColumnOrders(orders) 315} 316 317// WriterVersion returns the constructed application version from the 318// created by string 319func (f *FileMetaData) WriterVersion() *AppVersion { 320 if f.version == nil { 321 f.version = NewAppVersion(f.GetCreatedBy()) 322 } 323 return f.version 324} 325 326// SetFilePath will set the file path into all of the columns in each row group. 327func (f *FileMetaData) SetFilePath(path string) { 328 for _, rg := range f.RowGroups { 329 for _, chunk := range rg.Columns { 330 chunk.FilePath = &path 331 } 332 } 333} 334 335// AppendRowGroups will add all of the rowgroup metadata from other to the 336// current file metadata 337func (f *FileMetaData) AppendRowGroups(other *FileMetaData) error { 338 if !f.Schema.Equals(other.Schema) { 339 return xerrors.New("parquet/FileMetaData: AppendRowGroups requires equal schemas") 340 } 341 342 f.RowGroups = append(f.RowGroups, other.GetRowGroups()...) 343 for _, rg := range other.GetRowGroups() { 344 f.NumRows += rg.NumRows 345 } 346 return nil 347} 348 349// Subset will construct a new FileMetaData object containing only the requested 350// row groups by index 351func (f *FileMetaData) Subset(rowGroups []int) (*FileMetaData, error) { 352 for _, i := range rowGroups { 353 if i < len(f.RowGroups) { 354 continue 355 } 356 return nil, xerrors.Errorf("parquet: this file only has %d row groups, but requested a subset including row group: %d", len(f.RowGroups), i) 357 } 358 359 out := &FileMetaData{ 360 &format.FileMetaData{ 361 Schema: f.FileMetaData.Schema, 362 CreatedBy: f.CreatedBy, 363 ColumnOrders: f.GetColumnOrders(), 364 EncryptionAlgorithm: f.FileMetaData.EncryptionAlgorithm, 365 FooterSigningKeyMetadata: f.FooterSigningKeyMetadata, 366 Version: f.FileMetaData.Version, 367 KeyValueMetadata: f.KeyValueMetadata(), 368 }, 369 f.Schema, 370 f.FileDecryptor, 371 f.version, 372 0, 373 } 374 375 out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups)) 376 for _, selected := range rowGroups { 377 out.RowGroups = append(out.RowGroups, f.RowGroups[selected]) 378 out.NumRows += f.RowGroups[selected].GetNumRows() 379 } 380 381 return out, nil 382} 383 384func (f *FileMetaData) Equals(other *FileMetaData) bool { 385 return reflect.DeepEqual(f.FileMetaData, other.FileMetaData) 386} 387 388func (f *FileMetaData) KeyValueMetadata() KeyValueMetadata { 389 return f.GetKeyValueMetadata() 390} 391 392// VerifySignature constructs a cryptographic signature using the FileDecryptor 393// of the footer and then verifies it's integrity. 394// 395// Panics if f.FileDecryptor is nil 396func (f *FileMetaData) VerifySignature(signature []byte) bool { 397 if f.FileDecryptor == nil { 398 panic("decryption not set propertly, cannot verify signature") 399 } 400 401 serializer := thrift.NewThriftSerializer() 402 data, _ := serializer.Write(context.Background(), f.FileMetaData) 403 nonce := signature[:encryption.NonceLength] 404 tag := signature[encryption.NonceLength : encryption.NonceLength+encryption.GcmTagLength] 405 406 key := f.FileDecryptor.GetFooterKey() 407 aad := encryption.CreateFooterAad(f.FileDecryptor.FileAad()) 408 409 enc := encryption.NewAesEncryptor(f.FileDecryptor.Algorithm(), true) 410 var buf bytes.Buffer 411 buf.Grow(enc.CiphertextSizeDelta() + len(data)) 412 encryptedLen := enc.SignedFooterEncrypt(&buf, data, []byte(key), []byte(aad), nonce) 413 return bytes.Equal(buf.Bytes()[encryptedLen-encryption.GcmTagLength:], tag) 414} 415 416// WriteTo will serialize and write out this file metadata, encrypting it if 417// appropriate. 418// 419// If it is an encrypted file with a plaintext footer, then we will write the 420// signature with the unencrypted footer. 421func (f *FileMetaData) WriteTo(w io.Writer, encryptor encryption.Encryptor) (int64, error) { 422 serializer := thrift.NewThriftSerializer() 423 // only in encrypted files with plaintext footers, the encryption algorithm is set in the footer 424 if f.IsSetEncryptionAlgorithm() { 425 data, err := serializer.Write(context.Background(), f.FileMetaData) 426 if err != nil { 427 return 0, err 428 } 429 430 // encrypt the footer key 431 var buf bytes.Buffer 432 buf.Grow(encryptor.CiphertextSizeDelta() + len(data)) 433 encryptedLen := encryptor.Encrypt(&buf, data) 434 435 wrote := 0 436 n := 0 437 // write unencrypted footer 438 if n, err = w.Write(data); err != nil { 439 return int64(n), err 440 } 441 wrote += n 442 // write signature (nonce and tag) 443 buf.Next(4) 444 if n, err = w.Write(buf.Next(encryption.NonceLength)); err != nil { 445 return int64(wrote + n), err 446 } 447 wrote += n 448 buf.Next(encryptedLen - 4 - encryption.NonceLength - encryption.GcmTagLength) 449 n, err = w.Write(buf.Next(encryption.GcmTagLength)) 450 return int64(wrote + n), err 451 } 452 n, err := serializer.Serialize(f.FileMetaData, w, encryptor) 453 return int64(n), err 454} 455 456// Version returns the "version" of the file 457// 458// WARNING: The value returned by this method is unreliable as 1) the 459// parquet file metadata stores the version as a single integer and 460// 2) some producers are known to always write a hardcoded value. Therefore 461// you cannot use this value to know which features are used in the file. 462func (f *FileMetaData) Version() parquet.Version { 463 switch f.FileMetaData.Version { 464 case 1: 465 return parquet.V1_0 466 case 2: 467 return parquet.V2_LATEST 468 default: 469 // imporperly set version, assume parquet 1.0 470 return parquet.V1_0 471 } 472} 473 474// FileCryptoMetadata is a proxy for the thrift fileCryptoMetadata object 475type FileCryptoMetadata struct { 476 metadata *format.FileCryptoMetaData 477 cryptoMetadataLen uint32 478} 479 480// NewFileCryptoMetaData takes in the raw serialized bytes to deserialize 481// storing the number of bytes that were actually deserialized. 482func NewFileCryptoMetaData(metadata []byte) (ret FileCryptoMetadata, err error) { 483 ret.metadata = format.NewFileCryptoMetaData() 484 var remain uint64 485 remain, err = thrift.DeserializeThrift(ret.metadata, metadata) 486 ret.cryptoMetadataLen = uint32(uint64(len(metadata)) - remain) 487 return 488} 489 490// WriteTo writes out the serialized crypto metadata to w 491func (fc FileCryptoMetadata) WriteTo(w io.Writer) (int64, error) { 492 serializer := thrift.NewThriftSerializer() 493 n, err := serializer.Serialize(fc.metadata, w, nil) 494 return int64(n), err 495} 496 497// Len is the number of bytes that were deserialized to create this object 498func (fc FileCryptoMetadata) Len() int { return int(fc.cryptoMetadataLen) } 499 500func (fc FileCryptoMetadata) KeyMetadata() []byte { 501 return fc.metadata.KeyMetadata 502} 503 504// EncryptionAlgorithm constructs the object from the thrift instance of 505// the encryption algorithm 506func (fc FileCryptoMetadata) EncryptionAlgorithm() parquet.Algorithm { 507 return parquet.AlgorithmFromThrift(fc.metadata.GetEncryptionAlgorithm()) 508} 509