1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package metadata 18 19import ( 20 "bytes" 21 "context" 22 "io" 23 "reflect" 24 25 "github.com/apache/arrow/go/v6/arrow/memory" 26 "github.com/apache/arrow/go/v6/parquet" 27 "github.com/apache/arrow/go/v6/parquet/compress" 28 "github.com/apache/arrow/go/v6/parquet/internal/encryption" 29 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" 30 "github.com/apache/arrow/go/v6/parquet/internal/thrift" 31 "github.com/apache/arrow/go/v6/parquet/schema" 32 "golang.org/x/xerrors" 33) 34 35// PageEncodingStats is used for counting the number of pages of specific 36// types with the given internal encoding. 37type PageEncodingStats struct { 38 Encoding parquet.Encoding 39 PageType format.PageType 40} 41 42type statvalues struct { 43 *format.Statistics 44} 45 46func (s *statvalues) GetMin() []byte { return s.GetMinValue() } 47func (s *statvalues) GetMax() []byte { return s.GetMaxValue() } 48func (s *statvalues) IsSetMin() bool { return s.IsSetMinValue() } 49func (s *statvalues) IsSetMax() bool { return s.IsSetMaxValue() } 50 51func makeColumnStats(metadata *format.ColumnMetaData, descr *schema.Column, mem memory.Allocator) TypedStatistics { 52 if descr.ColumnOrder() == parquet.ColumnOrders.TypeDefinedOrder { 53 return NewStatisticsFromEncoded(descr, mem, 54 metadata.NumValues-metadata.Statistics.GetNullCount(), 55 &statvalues{metadata.Statistics}) 56 } 57 return NewStatisticsFromEncoded(descr, mem, 58 metadata.NumValues-metadata.Statistics.GetNullCount(), 59 metadata.Statistics) 60} 61 62// ColumnChunkMetaData is a proxy around format.ColumnChunkMetaData 63// containing all of the information and metadata for a given column chunk 64// and it's associated Column 65type ColumnChunkMetaData struct { 66 column *format.ColumnChunk 67 columnMeta *format.ColumnMetaData 68 decryptedMeta format.ColumnMetaData 69 descr *schema.Column 70 writerVersion *AppVersion 71 encodings []parquet.Encoding 72 encodingStats []format.PageEncodingStats 73 possibleStats TypedStatistics 74 mem memory.Allocator 75} 76 77// NewColumnChunkMetaData creates an instance of the metadata from a column chunk and descriptor 78// 79// this is primarily used internally or between the subpackages. ColumnChunkMetaDataBuilder should 80// be used by consumers instead of using this directly. 81func NewColumnChunkMetaData(column *format.ColumnChunk, descr *schema.Column, writerVersion *AppVersion, rowGroupOrdinal, columnOrdinal int16, fileDecryptor encryption.FileDecryptor) (*ColumnChunkMetaData, error) { 82 c := &ColumnChunkMetaData{ 83 column: column, 84 columnMeta: column.GetMetaData(), 85 descr: descr, 86 writerVersion: writerVersion, 87 mem: memory.DefaultAllocator, 88 } 89 if column.IsSetCryptoMetadata() { 90 ccmd := column.CryptoMetadata 91 92 if ccmd.IsSetENCRYPTION_WITH_COLUMN_KEY() { 93 if fileDecryptor != nil && fileDecryptor.Properties() != nil { 94 // should decrypt metadata 95 path := parquet.ColumnPath(ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetPathInSchema()) 96 keyMetadata := ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetKeyMetadata() 97 aadColumnMetadata := encryption.CreateModuleAad(fileDecryptor.FileAad(), encryption.ColumnMetaModule, rowGroupOrdinal, columnOrdinal, -1) 98 decryptor := fileDecryptor.GetColumnMetaDecryptor(path.String(), string(keyMetadata), aadColumnMetadata) 99 thrift.DeserializeThrift(&c.decryptedMeta, decryptor.Decrypt(column.GetEncryptedColumnMetadata())) 100 c.columnMeta = &c.decryptedMeta 101 } else { 102 return nil, xerrors.New("cannot decrypt column metadata. file decryption not setup correctly") 103 } 104 } 105 } 106 for _, enc := range c.columnMeta.Encodings { 107 c.encodings = append(c.encodings, parquet.Encoding(enc)) 108 } 109 for _, enc := range c.columnMeta.EncodingStats { 110 c.encodingStats = append(c.encodingStats, *enc) 111 } 112 return c, nil 113} 114 115// CryptoMetadata returns the cryptographic metadata for how this column was 116// encrypted and how to decrypt it. 117func (c *ColumnChunkMetaData) CryptoMetadata() *format.ColumnCryptoMetaData { 118 return c.column.GetCryptoMetadata() 119} 120 121// FileOffset is the location in the file where the column data begins 122func (c *ColumnChunkMetaData) FileOffset() int64 { return c.column.FileOffset } 123 124// FilePath gives the name of the parquet file if provided in the metadata 125func (c *ColumnChunkMetaData) FilePath() string { return c.column.GetFilePath() } 126 127// Type is the physical storage type used in the parquet file for this column chunk. 128func (c *ColumnChunkMetaData) Type() parquet.Type { return parquet.Type(c.columnMeta.Type) } 129 130// NumValues is the number of values stored in just this chunk including nulls. 131func (c *ColumnChunkMetaData) NumValues() int64 { return c.columnMeta.NumValues } 132 133// PathInSchema is the full path to this column from the root of the schema including 134// any nested columns 135func (c *ColumnChunkMetaData) PathInSchema() parquet.ColumnPath { 136 return c.columnMeta.GetPathInSchema() 137} 138 139// Compression provides the type of compression used for this particular chunk. 140func (c *ColumnChunkMetaData) Compression() compress.Compression { 141 return compress.Compression(c.columnMeta.Codec) 142} 143 144// Encodings returns the list of different encodings used in this chunk 145func (c *ColumnChunkMetaData) Encodings() []parquet.Encoding { return c.encodings } 146 147// EncodingStats connects the order of encodings based on the list of pages and types 148func (c *ColumnChunkMetaData) EncodingStats() []PageEncodingStats { 149 ret := make([]PageEncodingStats, len(c.encodingStats)) 150 for idx := range ret { 151 ret[idx].Encoding = parquet.Encoding(c.encodingStats[idx].Encoding) 152 ret[idx].PageType = c.encodingStats[idx].PageType 153 } 154 return ret 155} 156 157// HasDictionaryPage returns true if there is a dictionary page offset set in 158// this metadata. 159func (c *ColumnChunkMetaData) HasDictionaryPage() bool { 160 return c.columnMeta.IsSetDictionaryPageOffset() 161} 162 163// DictionaryPageOffset returns the location in the file where the dictionary page starts 164func (c *ColumnChunkMetaData) DictionaryPageOffset() int64 { 165 return c.columnMeta.GetDictionaryPageOffset() 166} 167 168// DataPageOffset returns the location in the file where the data pages begin for this column 169func (c *ColumnChunkMetaData) DataPageOffset() int64 { return c.columnMeta.GetDataPageOffset() } 170 171// HasIndexPage returns true if the offset for the index page is set in the metadata 172func (c *ColumnChunkMetaData) HasIndexPage() bool { return c.columnMeta.IsSetIndexPageOffset() } 173 174// IndexPageOffset is the location in the file where the index page starts. 175func (c *ColumnChunkMetaData) IndexPageOffset() int64 { return c.columnMeta.GetIndexPageOffset() } 176 177// TotalCompressedSize will be equal to TotalUncompressedSize if the data is not compressed. 178// Otherwise this will be the size of the actual data in the file. 179func (c *ColumnChunkMetaData) TotalCompressedSize() int64 { 180 return c.columnMeta.GetTotalCompressedSize() 181} 182 183// TotalUncompressedSize is the total size of the raw data after uncompressing the chunk 184func (c *ColumnChunkMetaData) TotalUncompressedSize() int64 { 185 return c.columnMeta.GetTotalUncompressedSize() 186} 187 188// BloomFilterOffset is the byte offset from the beginning of the file to the bloom 189// filter data. 190func (c *ColumnChunkMetaData) BloomFilterOffset() int64 { 191 return c.columnMeta.GetBloomFilterOffset() 192} 193 194// StatsSet returns true only if there are statistics set in the metadata and the column 195// descriptor has a sort order that is not SortUnknown 196// 197// It also checks the writer version to ensure that it was not written by a version 198// of parquet which is known to have incorrect stat computations. 199func (c *ColumnChunkMetaData) StatsSet() (bool, error) { 200 if !c.columnMeta.IsSetStatistics() || c.descr.SortOrder() == schema.SortUNKNOWN { 201 return false, nil 202 } 203 204 if c.possibleStats == nil { 205 c.possibleStats = makeColumnStats(c.columnMeta, c.descr, c.mem) 206 } 207 208 encoded, err := c.possibleStats.Encode() 209 if err != nil { 210 return false, err 211 } 212 213 return c.writerVersion.HasCorrectStatistics(c.Type(), c.descr.LogicalType(), encoded, c.descr.SortOrder()), nil 214} 215 216func (c *ColumnChunkMetaData) Equals(other *ColumnChunkMetaData) bool { 217 return reflect.DeepEqual(c.columnMeta, other.columnMeta) 218} 219 220// Statistics can return nil if there are no stats in this metadata 221func (c *ColumnChunkMetaData) Statistics() (TypedStatistics, error) { 222 ok, err := c.StatsSet() 223 if err != nil { 224 return nil, err 225 } 226 227 if ok { 228 return c.possibleStats, nil 229 } 230 return nil, nil 231} 232 233// ColumnChunkMetaDataBuilder is used during writing to construct metadata 234// for a given column chunk while writing, providing a proxy around constructing 235// the actual thrift object. 236type ColumnChunkMetaDataBuilder struct { 237 chunk *format.ColumnChunk 238 props *parquet.WriterProperties 239 column *schema.Column 240 241 compressedSize int64 242} 243 244func NewColumnChunkMetaDataBuilder(props *parquet.WriterProperties, column *schema.Column) *ColumnChunkMetaDataBuilder { 245 return NewColumnChunkMetaDataBuilderWithContents(props, column, format.NewColumnChunk()) 246} 247 248// NewColumnChunkMetaDataBuilderWithContents will construct a builder and start it with the provided 249// column chunk information rather than with an empty column chunk. 250func NewColumnChunkMetaDataBuilderWithContents(props *parquet.WriterProperties, column *schema.Column, chunk *format.ColumnChunk) *ColumnChunkMetaDataBuilder { 251 b := &ColumnChunkMetaDataBuilder{ 252 props: props, 253 column: column, 254 chunk: chunk, 255 } 256 b.init(chunk) 257 return b 258} 259 260// Contents returns the underlying thrift ColumnChunk object so that it can be used 261// for constructing or duplicating column metadata 262func (c *ColumnChunkMetaDataBuilder) Contents() *format.ColumnChunk { return c.chunk } 263 264func (c *ColumnChunkMetaDataBuilder) init(chunk *format.ColumnChunk) { 265 c.chunk = chunk 266 if !c.chunk.IsSetMetaData() { 267 c.chunk.MetaData = format.NewColumnMetaData() 268 } 269 c.chunk.MetaData.Type = format.Type(c.column.PhysicalType()) 270 c.chunk.MetaData.PathInSchema = schema.ColumnPathFromNode(c.column.SchemaNode()) 271 c.chunk.MetaData.Codec = format.CompressionCodec(c.props.CompressionFor(c.column.Path())) 272} 273 274func (c *ColumnChunkMetaDataBuilder) SetFilePath(val string) { 275 c.chunk.FilePath = &val 276} 277 278// Descr returns the associated column descriptor for this column chunk 279func (c *ColumnChunkMetaDataBuilder) Descr() *schema.Column { return c.column } 280 281func (c *ColumnChunkMetaDataBuilder) TotalCompressedSize() int64 { 282 // if this column is encrypted, after Finish is called, the MetaData 283 // field is set to nil and we store the compressed size so return that 284 if c.chunk.MetaData == nil { 285 return c.compressedSize 286 } 287 return c.chunk.MetaData.GetTotalCompressedSize() 288} 289 290func (c *ColumnChunkMetaDataBuilder) SetStats(val EncodedStatistics) { 291 c.chunk.MetaData.Statistics = val.ToThrift() 292} 293 294// ChunkMetaInfo is a helper struct for passing the offset and size information 295// for finishing the building of column chunk metadata 296type ChunkMetaInfo struct { 297 NumValues int64 298 DictPageOffset int64 299 IndexPageOffset int64 300 DataPageOffset int64 301 CompressedSize int64 302 UncompressedSize int64 303} 304 305// EncodingStats is a helper struct for passing the encoding stat information 306// for finishing up metadata for a column chunk. 307type EncodingStats struct { 308 DictEncodingStats map[parquet.Encoding]int32 309 DataEncodingStats map[parquet.Encoding]int32 310} 311 312// Finish finalizes the metadata with the given offsets, 313// flushes any compression that needs to be done, and performs 314// any encryption if an encryptor is provided. 315func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict, dictFallback bool, encStats EncodingStats, metaEncryptor encryption.Encryptor) error { 316 if info.DictPageOffset > 0 { 317 c.chunk.MetaData.DictionaryPageOffset = &info.DictPageOffset 318 c.chunk.FileOffset = info.DictPageOffset + info.CompressedSize 319 } else { 320 c.chunk.FileOffset = info.DataPageOffset + info.CompressedSize 321 } 322 323 c.chunk.MetaData.NumValues = info.NumValues 324 if info.IndexPageOffset >= 0 { 325 c.chunk.MetaData.IndexPageOffset = &info.IndexPageOffset 326 } 327 328 c.chunk.MetaData.DataPageOffset = info.DataPageOffset 329 c.chunk.MetaData.TotalUncompressedSize = info.UncompressedSize 330 c.chunk.MetaData.TotalCompressedSize = info.CompressedSize 331 332 // no matter the configuration, the maximum number of thrift encodings we'll 333 // populate is going to be 3: 334 // 1. potential dictionary index encoding 335 // 2. page encoding 336 // 3. RLE for repetition and definition levels 337 // so let's preallocate a capacity of 3 but initialize the slice at 0 len 338 const maxEncodings = 3 339 340 thriftEncodings := make([]format.Encoding, 0, maxEncodings) 341 if hasDict { 342 thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryIndexEncoding())) 343 if c.props.Version() == parquet.V1_0 { 344 thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN) 345 } else { 346 thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryPageEncoding())) 347 } 348 } else { // no dictionary 349 thriftEncodings = append(thriftEncodings, format.Encoding(c.props.EncodingFor(c.column.Path()))) 350 } 351 352 thriftEncodings = append(thriftEncodings, format.Encoding(parquet.Encodings.RLE)) 353 // Only PLAIN encoding is supported for fallback in V1 354 // TODO(zeroshade): Use user specified encoding for V2 355 if dictFallback { 356 thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN) 357 } 358 c.chunk.MetaData.Encodings = thriftEncodings 359 360 thriftEncodingStats := make([]*format.PageEncodingStats, 0, len(encStats.DictEncodingStats)+len(encStats.DataEncodingStats)) 361 for k, v := range encStats.DictEncodingStats { 362 thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{ 363 PageType: format.PageType_DICTIONARY_PAGE, 364 Encoding: format.Encoding(k), 365 Count: v, 366 }) 367 } 368 for k, v := range encStats.DataEncodingStats { 369 thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{ 370 PageType: format.PageType_DATA_PAGE, 371 Encoding: format.Encoding(k), 372 Count: v, 373 }) 374 } 375 c.chunk.MetaData.EncodingStats = thriftEncodingStats 376 377 encryptProps := c.props.ColumnEncryptionProperties(c.column.Path()) 378 if encryptProps != nil && encryptProps.IsEncrypted() { 379 ccmd := format.NewColumnCryptoMetaData() 380 if encryptProps.IsEncryptedWithFooterKey() { 381 ccmd.ENCRYPTION_WITH_FOOTER_KEY = format.NewEncryptionWithFooterKey() 382 } else { 383 ccmd.ENCRYPTION_WITH_COLUMN_KEY = &format.EncryptionWithColumnKey{ 384 KeyMetadata: []byte(encryptProps.KeyMetadata()), 385 PathInSchema: c.column.ColumnPath(), 386 } 387 } 388 c.chunk.CryptoMetadata = ccmd 389 390 encryptedFooter := c.props.FileEncryptionProperties().EncryptedFooter() 391 encryptMetadata := !encryptedFooter || !encryptProps.IsEncryptedWithFooterKey() 392 if encryptMetadata { 393 // Serialize and encrypt ColumnMetadata separately 394 // Thrift-serialize the ColumnMetaData structure, 395 // encrypt it with the column key, and write to encrypted_column_metadata 396 serializer := thrift.NewThriftSerializer() 397 data, err := serializer.Write(context.Background(), c.chunk.MetaData) 398 if err != nil { 399 return err 400 } 401 var buf bytes.Buffer 402 metaEncryptor.Encrypt(&buf, data) 403 c.chunk.EncryptedColumnMetadata = buf.Bytes() 404 405 if encryptedFooter { 406 c.compressedSize = c.chunk.MetaData.GetTotalCompressedSize() 407 c.chunk.MetaData = nil 408 } else { 409 // Keep redacted metadata version for old readers 410 c.chunk.MetaData.Statistics = nil 411 c.chunk.MetaData.EncodingStats = nil 412 } 413 } 414 } 415 return nil 416} 417 418// WriteTo will always return 0 as the int64 since the thrift writer library 419// does not return the number of bytes written, we only use the signature 420// of (int64, error) in order to match the standard WriteTo interfaces. 421func (c *ColumnChunkMetaDataBuilder) WriteTo(w io.Writer) (int64, error) { 422 return 0, thrift.SerializeThriftStream(c.chunk, w) 423} 424