1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package metadata 18 19import ( 20 "bytes" 21 "encoding/binary" 22 "math" 23 "unsafe" 24 25 "github.com/apache/arrow/go/v6/arrow" 26 "github.com/apache/arrow/go/v6/arrow/memory" 27 "github.com/apache/arrow/go/v6/parquet" 28 "github.com/apache/arrow/go/v6/parquet/internal/debug" 29 "github.com/apache/arrow/go/v6/parquet/internal/encoding" 30 "github.com/apache/arrow/go/v6/parquet/internal/utils" 31 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" 32 "github.com/apache/arrow/go/v6/parquet/schema" 33) 34 35//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata statistics_types.gen.go.tmpl 36 37type StatProvider interface { 38 GetMin() []byte 39 GetMax() []byte 40 GetNullCount() int64 41 GetDistinctCount() int64 42 IsSetMax() bool 43 IsSetMin() bool 44 IsSetNullCount() bool 45 IsSetDistinctCount() bool 46} 47 48// EncodedStatistics are raw statistics with encoded values that will be written 49// to the parquet file, or was read from the parquet file. 50type EncodedStatistics struct { 51 HasMax bool 52 Max []byte 53 HasMin bool 54 Min []byte 55 Signed bool 56 HasNullCount bool 57 NullCount int64 58 HasDistinctCount bool 59 DistinctCount int64 60} 61 62// ApplyStatSizeLimits sets the maximum size of the min/max values. 63// 64// from parquet-mr 65// we don't write stats larger than the max size rather than truncating. 66// the rationale is that some engines may use the minimum value in the page 67// as the true minimum for aggregations and there is no way to mark that 68// a value has been truncated and is a lower bound and not in the page 69func (e *EncodedStatistics) ApplyStatSizeLimits(length int) { 70 if len(e.Max) > length { 71 e.HasMax = false 72 } 73 if len(e.Min) > length { 74 e.HasMin = false 75 } 76} 77 78// IsSet returns true iff one of the Has* values is true. 79func (e *EncodedStatistics) IsSet() bool { 80 return e.HasMin || e.HasMax || e.HasNullCount || e.HasDistinctCount 81} 82 83// SetMax sets the encoded Max value to val and sets HasMax to true 84func (e *EncodedStatistics) SetMax(val []byte) *EncodedStatistics { 85 e.Max = val[:] 86 e.HasMax = true 87 return e 88} 89 90// SetMin sets the encoded Min value to val, and sets HasMin to true 91func (e *EncodedStatistics) SetMin(val []byte) *EncodedStatistics { 92 e.Min = val[:] 93 e.HasMin = true 94 return e 95} 96 97// SetNullCount sets the NullCount to val and sets HasNullCount to true 98func (e *EncodedStatistics) SetNullCount(val int64) *EncodedStatistics { 99 e.NullCount = val 100 e.HasNullCount = true 101 return e 102} 103 104// SetDistinctCount sets the DistinctCount to val and sets HasDistinctCount to true 105func (e *EncodedStatistics) SetDistinctCount(val int64) *EncodedStatistics { 106 e.DistinctCount = val 107 e.HasDistinctCount = true 108 return e 109} 110 111func (e *EncodedStatistics) ToThrift() (stats *format.Statistics) { 112 stats = format.NewStatistics() 113 if e.HasMin { 114 stats.MinValue = e.Min 115 // if sort order is SIGNED then the old min value must be set too for backwards compatibility 116 if e.Signed { 117 stats.Min = e.Min 118 } 119 } 120 if e.HasMax { 121 stats.MaxValue = e.Max 122 // if sort order is SIGNED then old max value must be set to 123 if e.Signed { 124 stats.Max = e.Max 125 } 126 } 127 if e.HasNullCount { 128 stats.NullCount = &e.NullCount 129 } 130 if e.HasDistinctCount { 131 stats.DistinctCount = &e.DistinctCount 132 } 133 return 134} 135 136// TypedStatistics is the base interface for dealing with stats as 137// they are being populated 138type TypedStatistics interface { 139 // Type is the underlying physical type for this stat block 140 Type() parquet.Type 141 // Returns true if there is a min and max value set for this stat object 142 HasMinMax() bool 143 // Returns true if a nullcount has been set 144 HasNullCount() bool 145 // returns true only if a distinct count has been set 146 // current implementation does of the writer does not automatically populate 147 // the distinct count right now. 148 HasDistinctCount() bool 149 NullCount() int64 150 DistinctCount() int64 151 NumValues() int64 152 // return the column descriptor that this stat object was initialized with 153 Descr() *schema.Column 154 155 // Encode the current min value and return the bytes. ByteArray does not 156 // include the len in the encoded bytes, otherwise this is identical to 157 // plain encoding 158 EncodeMin() []byte 159 // Encode the current max value and return the bytes. ByteArray does not 160 // include the len in the encoded bytes, otherwise this is identical to 161 // plain encoding 162 EncodeMax() []byte 163 // Populate an EncodedStatistics object from the current stats 164 Encode() (EncodedStatistics, error) 165 // Resets all values to 0 to enable reusing this stat object for multiple 166 // columns, by calling Encode to get the finished values and then calling 167 // reset 168 Reset() 169 // Merge the min/max/nullcounts and distinct count from the passed stat object 170 // into this one. 171 Merge(TypedStatistics) 172} 173 174type statistics struct { 175 descr *schema.Column 176 hasMinMax bool 177 hasNullCount bool 178 hasDistinctCount bool 179 mem memory.Allocator 180 nvalues int64 181 stats EncodedStatistics 182 order schema.SortOrder 183 184 encoder encoding.TypedEncoder 185} 186 187func (s *statistics) incNulls(n int64) { 188 s.stats.NullCount += n 189 s.hasNullCount = true 190} 191func (s *statistics) incDistinct(n int64) { 192 s.stats.DistinctCount += n 193 s.hasDistinctCount = true 194} 195 196func (s *statistics) Descr() *schema.Column { return s.descr } 197func (s *statistics) Type() parquet.Type { return s.descr.PhysicalType() } 198func (s *statistics) HasDistinctCount() bool { return s.hasDistinctCount } 199func (s *statistics) HasMinMax() bool { return s.hasMinMax } 200func (s *statistics) HasNullCount() bool { return s.hasNullCount } 201func (s *statistics) NullCount() int64 { return s.stats.NullCount } 202func (s *statistics) DistinctCount() int64 { return s.stats.DistinctCount } 203func (s *statistics) NumValues() int64 { return s.nvalues } 204 205func (s *statistics) Reset() { 206 s.stats.NullCount = 0 207 s.stats.DistinctCount = 0 208 s.nvalues = 0 209 s.hasMinMax = false 210 s.hasDistinctCount = false 211 s.hasNullCount = false 212} 213 214// base merge function for base non-typed stat object so we don't have to 215// duplicate this in each of the typed implementations 216func (s *statistics) merge(other TypedStatistics) { 217 s.nvalues += other.NumValues() 218 if other.HasNullCount() { 219 s.stats.NullCount += other.NullCount() 220 } 221 if other.HasDistinctCount() { 222 // this isn't technically correct as it should be keeping an actual set 223 // of the distinct values and then combining the sets to get a new count 224 // but for now we'll do this to match the C++ implementation at the current 225 // time. 226 s.stats.DistinctCount += other.DistinctCount() 227 } 228} 229 230func coalesce(val, fallback interface{}) interface{} { 231 switch v := val.(type) { 232 case float32: 233 if math.IsNaN(float64(v)) { 234 return fallback 235 } 236 case float64: 237 if math.IsNaN(v) { 238 return fallback 239 } 240 } 241 return val 242} 243 244func signedByteLess(a, b []byte) bool { 245 // signed comparison is used for integers encoded as big-endian twos complement 246 // integers (e.g. decimals) 247 248 // if at least one of the lengths is zero, we can short circuit 249 if len(a) == 0 || len(b) == 0 { 250 return len(a) == 0 && len(b) > 0 251 } 252 253 sa := *(*[]int8)(unsafe.Pointer(&a)) 254 sb := *(*[]int8)(unsafe.Pointer(&b)) 255 256 // we can short circuit for different signd numbers or for equal length byte 257 // arrays that have different first bytes. The equality requirement is necessary 258 // for sign extension cases. 0xFF10 should be equal to 0x10 (due to big endian sign extension) 259 if int8(0x80&uint8(sa[0])) != int8(0x80&uint8(sb[0])) || (len(sa) == len(sb) && sa[0] != sb[0]) { 260 return sa[0] < sb[0] 261 } 262 263 // when the lengths are unequal and the numbers are of the same sign, we need 264 // to do comparison by sign extending the shorter value first, and once we get 265 // to equal sized arrays, lexicographical unsigned comparison of everything but 266 // the first byte is sufficient. 267 268 if len(a) != len(b) { 269 var lead []byte 270 if len(a) > len(b) { 271 leadLen := len(a) - len(b) 272 lead = a[:leadLen] 273 a = a[leadLen:] 274 } else { 275 debug.Assert(len(a) < len(b), "something weird in byte slice signed comparison") 276 leadLen := len(b) - len(a) 277 lead = b[:leadLen] 278 b = b[leadLen:] 279 } 280 281 // compare extra bytes to the sign extension of the first byte of the other number 282 var extension byte 283 if sa[0] < 0 { 284 extension = 0xFF 285 } 286 287 notequal := false 288 for _, c := range lead { 289 if c != extension { 290 notequal = true 291 break 292 } 293 } 294 295 if notequal { 296 // since sign extension are extrema values for unsigned bytes: 297 // 298 // Four cases exist: 299 // negative values: 300 // b is the longer value 301 // b must be the lesser value: return false 302 // else: 303 // a must be the lesser value: return true 304 // 305 // positive values: 306 // b is the longer value 307 // values in b must be greater than a: return true 308 // else: 309 // values in a must be greater than b: return false 310 neg := sa[0] < 0 311 blonger := len(sa) < len(sb) 312 return neg != blonger 313 } 314 } else { 315 a = a[1:] 316 b = b[1:] 317 } 318 319 return bytes.Compare(a, b) == -1 320} 321 322func (BooleanStatistics) defaultMin() bool { return true } 323func (BooleanStatistics) defaultMax() bool { return false } 324func (s *Int32Statistics) defaultMin() int32 { 325 if s.order == schema.SortUNSIGNED { 326 val := math.MaxUint32 327 return int32(val) 328 } 329 return math.MaxInt32 330} 331 332func (s *Int32Statistics) defaultMax() int32 { 333 if s.order == schema.SortUNSIGNED { 334 return int32(0) 335 } 336 return math.MinInt32 337} 338 339func (s *Int64Statistics) defaultMin() int64 { 340 if s.order == schema.SortUNSIGNED { 341 val := uint64(math.MaxUint64) 342 return int64(val) 343 } 344 return math.MaxInt64 345} 346 347func (s *Int64Statistics) defaultMax() int64 { 348 if s.order == schema.SortUNSIGNED { 349 return int64(0) 350 } 351 return math.MinInt64 352} 353 354var ( 355 defaultMinInt96 parquet.Int96 356 defaultMinUInt96 parquet.Int96 357 defaultMaxInt96 parquet.Int96 358 defaultMaxUInt96 parquet.Int96 359) 360 361func init() { 362 i96 := arrow.Uint32Traits.CastFromBytes(defaultMinInt96[:]) 363 i96[0] = math.MaxUint32 364 i96[1] = math.MaxUint32 365 i96[2] = math.MaxInt32 366 367 i96 = arrow.Uint32Traits.CastFromBytes(defaultMinUInt96[:]) 368 i96[0] = math.MaxUint32 369 i96[1] = math.MaxUint32 370 i96[2] = math.MaxUint32 371 372 // golang will initialize the bytes to 0 373 i96 = arrow.Uint32Traits.CastFromBytes(defaultMaxInt96[:]) 374 i96[2] = math.MaxInt32 + 1 375 376 // defaultMaxUInt96 will be initialized to 0 as desired 377} 378 379func (s *Int96Statistics) defaultMin() parquet.Int96 { 380 if s.order == schema.SortUNSIGNED { 381 return defaultMinUInt96 382 } 383 return defaultMinInt96 384} 385 386func (s *Int96Statistics) defaultMax() parquet.Int96 { 387 if s.order == schema.SortUNSIGNED { 388 return defaultMaxUInt96 389 } 390 return defaultMaxInt96 391} 392 393func (Float32Statistics) defaultMin() float32 { return math.MaxFloat32 } 394func (Float32Statistics) defaultMax() float32 { return -math.MaxFloat32 } 395func (Float64Statistics) defaultMin() float64 { return math.MaxFloat64 } 396func (Float64Statistics) defaultMax() float64 { return -math.MaxFloat64 } 397func (ByteArrayStatistics) defaultMin() parquet.ByteArray { return nil } 398func (ByteArrayStatistics) defaultMax() parquet.ByteArray { return nil } 399func (FixedLenByteArrayStatistics) defaultMin() parquet.FixedLenByteArray { return nil } 400func (FixedLenByteArrayStatistics) defaultMax() parquet.FixedLenByteArray { return nil } 401 402func (BooleanStatistics) equal(a, b bool) bool { return a == b } 403func (Int32Statistics) equal(a, b int32) bool { return a == b } 404func (Int64Statistics) equal(a, b int64) bool { return a == b } 405func (Float32Statistics) equal(a, b float32) bool { return a == b } 406func (Float64Statistics) equal(a, b float64) bool { return a == b } 407func (Int96Statistics) equal(a, b parquet.Int96) bool { return bytes.Equal(a[:], b[:]) } 408func (ByteArrayStatistics) equal(a, b parquet.ByteArray) bool { return bytes.Equal(a, b) } 409func (FixedLenByteArrayStatistics) equal(a, b parquet.FixedLenByteArray) bool { 410 return bytes.Equal(a, b) 411} 412 413func (BooleanStatistics) less(a, b bool) bool { 414 return !a && b 415} 416 417func (s *Int32Statistics) less(a, b int32) bool { 418 if s.order == schema.SortUNSIGNED { 419 return uint32(a) < uint32(b) 420 } 421 return a < b 422} 423 424func (s *Int64Statistics) less(a, b int64) bool { 425 if s.order == schema.SortUNSIGNED { 426 return uint64(a) < uint64(b) 427 } 428 return a < b 429} 430func (Float32Statistics) less(a, b float32) bool { return a < b } 431func (Float64Statistics) less(a, b float64) bool { return a < b } 432func (s *Int96Statistics) less(a, b parquet.Int96) bool { 433 i96a := arrow.Uint32Traits.CastFromBytes(a[:]) 434 i96b := arrow.Uint32Traits.CastFromBytes(b[:]) 435 436 a0, a1, a2 := utils.ToLEUint32(i96a[0]), utils.ToLEUint32(i96a[1]), utils.ToLEUint32(i96a[2]) 437 b0, b1, b2 := utils.ToLEUint32(i96b[0]), utils.ToLEUint32(i96b[1]), utils.ToLEUint32(i96b[2]) 438 439 if a2 != b2 { 440 // only the msb bit is by signed comparison 441 if s.order == schema.SortSIGNED { 442 return int32(a2) < int32(b2) 443 } 444 return a2 < b2 445 } else if a1 != b1 { 446 return a1 < b1 447 } 448 return a0 < b0 449} 450 451func (s *ByteArrayStatistics) less(a, b parquet.ByteArray) bool { 452 if s.order == schema.SortUNSIGNED { 453 return bytes.Compare(a, b) == -1 454 } 455 456 return signedByteLess([]byte(a), []byte(b)) 457} 458 459func (s *FixedLenByteArrayStatistics) less(a, b parquet.FixedLenByteArray) bool { 460 if s.order == schema.SortUNSIGNED { 461 return bytes.Compare(a, b) == -1 462 } 463 464 return signedByteLess([]byte(a), []byte(b)) 465} 466 467func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean) *minmaxPairBoolean { return &minMax } 468func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32 { return &minMax } 469func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64 { return &minMax } 470func (Int96Statistics) cleanStat(minMax minmaxPairInt96) *minmaxPairInt96 { return &minMax } 471 472// in the case of floating point types, the following rules are applied as per parquet-mr: 473// - if any of min/max is NaN, return nothing 474// - if min is 0.0f replace with -0.0f 475// - if max is -0.0f replace with 0.0f 476// 477// https://issues.apache.org/jira/browse/PARQUET-1222 tracks the official documenting of 478// a well-defined order for floats and doubles. 479func (Float32Statistics) cleanStat(minMax minmaxPairFloat32) *minmaxPairFloat32 { 480 if math.IsNaN(float64(minMax[0])) || math.IsNaN(float64(minMax[1])) { 481 return nil 482 } 483 484 if minMax[0] == math.MaxFloat32 && minMax[1] == -math.MaxFloat32 { 485 return nil 486 } 487 488 var zero float32 = 0 489 if minMax[0] == zero && !math.Signbit(float64(minMax[0])) { 490 minMax[0] = -minMax[0] 491 } 492 493 if minMax[1] == zero && math.Signbit(float64(minMax[1])) { 494 minMax[1] = -minMax[1] 495 } 496 497 return &minMax 498} 499 500func (Float64Statistics) cleanStat(minMax minmaxPairFloat64) *minmaxPairFloat64 { 501 if math.IsNaN(minMax[0]) || math.IsNaN(minMax[1]) { 502 return nil 503 } 504 505 if minMax[0] == math.MaxFloat64 && minMax[1] == -math.MaxFloat64 { 506 return nil 507 } 508 509 var zero float64 = 0 510 if minMax[0] == zero && !math.Signbit(minMax[0]) { 511 minMax[0] = -minMax[0] 512 } 513 514 if minMax[1] == zero && math.Signbit(minMax[1]) { 515 minMax[1] = -minMax[1] 516 } 517 518 return &minMax 519} 520 521func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray) *minmaxPairByteArray { 522 if minMax[0] == nil || minMax[1] == nil { 523 return nil 524 } 525 return &minMax 526} 527 528func (FixedLenByteArrayStatistics) cleanStat(minMax minmaxPairFixedLenByteArray) *minmaxPairFixedLenByteArray { 529 if minMax[0] == nil || minMax[1] == nil { 530 return nil 531 } 532 return &minMax 533} 534 535func GetStatValue(typ parquet.Type, val []byte) interface{} { 536 switch typ { 537 case parquet.Types.Boolean: 538 return val[0] != 0 539 case parquet.Types.Int32: 540 return int32(binary.LittleEndian.Uint32(val)) 541 case parquet.Types.Int64: 542 return int64(binary.LittleEndian.Uint64(val)) 543 case parquet.Types.Int96: 544 p := parquet.Int96{} 545 copy(p[:], val) 546 return p 547 case parquet.Types.Float: 548 return math.Float32frombits(binary.LittleEndian.Uint32(val)) 549 case parquet.Types.Double: 550 return math.Float64frombits(binary.LittleEndian.Uint64(val)) 551 case parquet.Types.ByteArray: 552 fallthrough 553 case parquet.Types.FixedLenByteArray: 554 return val 555 } 556 return nil 557} 558