1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package metadata
18
19import (
20	"bytes"
21	"encoding/binary"
22	"math"
23	"unsafe"
24
25	"github.com/apache/arrow/go/v6/arrow"
26	"github.com/apache/arrow/go/v6/arrow/memory"
27	"github.com/apache/arrow/go/v6/parquet"
28	"github.com/apache/arrow/go/v6/parquet/internal/debug"
29	"github.com/apache/arrow/go/v6/parquet/internal/encoding"
30	"github.com/apache/arrow/go/v6/parquet/internal/utils"
31	format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
32	"github.com/apache/arrow/go/v6/parquet/schema"
33)
34
35//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata statistics_types.gen.go.tmpl
36
37type StatProvider interface {
38	GetMin() []byte
39	GetMax() []byte
40	GetNullCount() int64
41	GetDistinctCount() int64
42	IsSetMax() bool
43	IsSetMin() bool
44	IsSetNullCount() bool
45	IsSetDistinctCount() bool
46}
47
48// EncodedStatistics are raw statistics with encoded values that will be written
49// to the parquet file, or was read from the parquet file.
50type EncodedStatistics struct {
51	HasMax           bool
52	Max              []byte
53	HasMin           bool
54	Min              []byte
55	Signed           bool
56	HasNullCount     bool
57	NullCount        int64
58	HasDistinctCount bool
59	DistinctCount    int64
60}
61
62// ApplyStatSizeLimits sets the maximum size of the min/max values.
63//
64// from parquet-mr
65// we don't write stats larger than the max size rather than truncating.
66// the rationale is that some engines may use the minimum value in the page
67// as the true minimum for aggregations and there is no way to mark that
68// a value has been truncated and is a lower bound and not in the page
69func (e *EncodedStatistics) ApplyStatSizeLimits(length int) {
70	if len(e.Max) > length {
71		e.HasMax = false
72	}
73	if len(e.Min) > length {
74		e.HasMin = false
75	}
76}
77
78// IsSet returns true iff one of the Has* values is true.
79func (e *EncodedStatistics) IsSet() bool {
80	return e.HasMin || e.HasMax || e.HasNullCount || e.HasDistinctCount
81}
82
83// SetMax sets the encoded Max value to val and sets HasMax to true
84func (e *EncodedStatistics) SetMax(val []byte) *EncodedStatistics {
85	e.Max = val[:]
86	e.HasMax = true
87	return e
88}
89
90// SetMin sets the encoded Min value to val, and sets HasMin to true
91func (e *EncodedStatistics) SetMin(val []byte) *EncodedStatistics {
92	e.Min = val[:]
93	e.HasMin = true
94	return e
95}
96
97// SetNullCount sets the NullCount to val and sets HasNullCount to true
98func (e *EncodedStatistics) SetNullCount(val int64) *EncodedStatistics {
99	e.NullCount = val
100	e.HasNullCount = true
101	return e
102}
103
104// SetDistinctCount sets the DistinctCount to val and sets HasDistinctCount to true
105func (e *EncodedStatistics) SetDistinctCount(val int64) *EncodedStatistics {
106	e.DistinctCount = val
107	e.HasDistinctCount = true
108	return e
109}
110
111func (e *EncodedStatistics) ToThrift() (stats *format.Statistics) {
112	stats = format.NewStatistics()
113	if e.HasMin {
114		stats.MinValue = e.Min
115		// if sort order is SIGNED then the old min value must be set too for backwards compatibility
116		if e.Signed {
117			stats.Min = e.Min
118		}
119	}
120	if e.HasMax {
121		stats.MaxValue = e.Max
122		// if sort order is SIGNED then old max value must be set to
123		if e.Signed {
124			stats.Max = e.Max
125		}
126	}
127	if e.HasNullCount {
128		stats.NullCount = &e.NullCount
129	}
130	if e.HasDistinctCount {
131		stats.DistinctCount = &e.DistinctCount
132	}
133	return
134}
135
136// TypedStatistics is the base interface for dealing with stats as
137// they are being populated
138type TypedStatistics interface {
139	// Type is the underlying physical type for this stat block
140	Type() parquet.Type
141	// Returns true if there is a min and max value set for this stat object
142	HasMinMax() bool
143	// Returns true if a nullcount has been set
144	HasNullCount() bool
145	// returns true only if a distinct count has been set
146	// current implementation does of the writer does not automatically populate
147	// the distinct count right now.
148	HasDistinctCount() bool
149	NullCount() int64
150	DistinctCount() int64
151	NumValues() int64
152	// return the column descriptor that this stat object was initialized with
153	Descr() *schema.Column
154
155	// Encode the current min value and return the bytes. ByteArray does not
156	// include the len in the encoded bytes, otherwise this is identical to
157	// plain encoding
158	EncodeMin() []byte
159	// Encode the current max value and return the bytes. ByteArray does not
160	// include the len in the encoded bytes, otherwise this is identical to
161	// plain encoding
162	EncodeMax() []byte
163	// Populate an EncodedStatistics object from the current stats
164	Encode() (EncodedStatistics, error)
165	// Resets all values to 0 to enable reusing this stat object for multiple
166	// columns, by calling Encode to get the finished values and then calling
167	// reset
168	Reset()
169	// Merge the min/max/nullcounts and distinct count from the passed stat object
170	// into this one.
171	Merge(TypedStatistics)
172}
173
174type statistics struct {
175	descr            *schema.Column
176	hasMinMax        bool
177	hasNullCount     bool
178	hasDistinctCount bool
179	mem              memory.Allocator
180	nvalues          int64
181	stats            EncodedStatistics
182	order            schema.SortOrder
183
184	encoder encoding.TypedEncoder
185}
186
187func (s *statistics) incNulls(n int64) {
188	s.stats.NullCount += n
189	s.hasNullCount = true
190}
191func (s *statistics) incDistinct(n int64) {
192	s.stats.DistinctCount += n
193	s.hasDistinctCount = true
194}
195
196func (s *statistics) Descr() *schema.Column  { return s.descr }
197func (s *statistics) Type() parquet.Type     { return s.descr.PhysicalType() }
198func (s *statistics) HasDistinctCount() bool { return s.hasDistinctCount }
199func (s *statistics) HasMinMax() bool        { return s.hasMinMax }
200func (s *statistics) HasNullCount() bool     { return s.hasNullCount }
201func (s *statistics) NullCount() int64       { return s.stats.NullCount }
202func (s *statistics) DistinctCount() int64   { return s.stats.DistinctCount }
203func (s *statistics) NumValues() int64       { return s.nvalues }
204
205func (s *statistics) Reset() {
206	s.stats.NullCount = 0
207	s.stats.DistinctCount = 0
208	s.nvalues = 0
209	s.hasMinMax = false
210	s.hasDistinctCount = false
211	s.hasNullCount = false
212}
213
214// base merge function for base non-typed stat object so we don't have to
215// duplicate this in each of the typed implementations
216func (s *statistics) merge(other TypedStatistics) {
217	s.nvalues += other.NumValues()
218	if other.HasNullCount() {
219		s.stats.NullCount += other.NullCount()
220	}
221	if other.HasDistinctCount() {
222		// this isn't technically correct as it should be keeping an actual set
223		// of the distinct values and then combining the sets to get a new count
224		// but for now we'll do this to match the C++ implementation at the current
225		// time.
226		s.stats.DistinctCount += other.DistinctCount()
227	}
228}
229
230func coalesce(val, fallback interface{}) interface{} {
231	switch v := val.(type) {
232	case float32:
233		if math.IsNaN(float64(v)) {
234			return fallback
235		}
236	case float64:
237		if math.IsNaN(v) {
238			return fallback
239		}
240	}
241	return val
242}
243
244func signedByteLess(a, b []byte) bool {
245	// signed comparison is used for integers encoded as big-endian twos complement
246	// integers (e.g. decimals)
247
248	// if at least one of the lengths is zero, we can short circuit
249	if len(a) == 0 || len(b) == 0 {
250		return len(a) == 0 && len(b) > 0
251	}
252
253	sa := *(*[]int8)(unsafe.Pointer(&a))
254	sb := *(*[]int8)(unsafe.Pointer(&b))
255
256	// we can short circuit for different signd numbers or for equal length byte
257	// arrays that have different first bytes. The equality requirement is necessary
258	// for sign extension cases. 0xFF10 should be equal to 0x10 (due to big endian sign extension)
259	if int8(0x80&uint8(sa[0])) != int8(0x80&uint8(sb[0])) || (len(sa) == len(sb) && sa[0] != sb[0]) {
260		return sa[0] < sb[0]
261	}
262
263	// when the lengths are unequal and the numbers are of the same sign, we need
264	// to do comparison by sign extending the shorter value first, and once we get
265	// to equal sized arrays, lexicographical unsigned comparison of everything but
266	// the first byte is sufficient.
267
268	if len(a) != len(b) {
269		var lead []byte
270		if len(a) > len(b) {
271			leadLen := len(a) - len(b)
272			lead = a[:leadLen]
273			a = a[leadLen:]
274		} else {
275			debug.Assert(len(a) < len(b), "something weird in byte slice signed comparison")
276			leadLen := len(b) - len(a)
277			lead = b[:leadLen]
278			b = b[leadLen:]
279		}
280
281		// compare extra bytes to the sign extension of the first byte of the other number
282		var extension byte
283		if sa[0] < 0 {
284			extension = 0xFF
285		}
286
287		notequal := false
288		for _, c := range lead {
289			if c != extension {
290				notequal = true
291				break
292			}
293		}
294
295		if notequal {
296			// since sign extension are extrema values for unsigned bytes:
297			//
298			// Four cases exist:
299			//	 negative values:
300			//	   b is the longer value
301			//       b must be the lesser value: return false
302			//     else:
303			//       a must be the lesser value: return true
304			//
305			//   positive values:
306			//     b is the longer value
307			//       values in b must be greater than a: return true
308			//     else:
309			//       values in a must be greater than b: return false
310			neg := sa[0] < 0
311			blonger := len(sa) < len(sb)
312			return neg != blonger
313		}
314	} else {
315		a = a[1:]
316		b = b[1:]
317	}
318
319	return bytes.Compare(a, b) == -1
320}
321
322func (BooleanStatistics) defaultMin() bool { return true }
323func (BooleanStatistics) defaultMax() bool { return false }
324func (s *Int32Statistics) defaultMin() int32 {
325	if s.order == schema.SortUNSIGNED {
326		val := math.MaxUint32
327		return int32(val)
328	}
329	return math.MaxInt32
330}
331
332func (s *Int32Statistics) defaultMax() int32 {
333	if s.order == schema.SortUNSIGNED {
334		return int32(0)
335	}
336	return math.MinInt32
337}
338
339func (s *Int64Statistics) defaultMin() int64 {
340	if s.order == schema.SortUNSIGNED {
341		val := uint64(math.MaxUint64)
342		return int64(val)
343	}
344	return math.MaxInt64
345}
346
347func (s *Int64Statistics) defaultMax() int64 {
348	if s.order == schema.SortUNSIGNED {
349		return int64(0)
350	}
351	return math.MinInt64
352}
353
354var (
355	defaultMinInt96  parquet.Int96
356	defaultMinUInt96 parquet.Int96
357	defaultMaxInt96  parquet.Int96
358	defaultMaxUInt96 parquet.Int96
359)
360
361func init() {
362	i96 := arrow.Uint32Traits.CastFromBytes(defaultMinInt96[:])
363	i96[0] = math.MaxUint32
364	i96[1] = math.MaxUint32
365	i96[2] = math.MaxInt32
366
367	i96 = arrow.Uint32Traits.CastFromBytes(defaultMinUInt96[:])
368	i96[0] = math.MaxUint32
369	i96[1] = math.MaxUint32
370	i96[2] = math.MaxUint32
371
372	// golang will initialize the bytes to 0
373	i96 = arrow.Uint32Traits.CastFromBytes(defaultMaxInt96[:])
374	i96[2] = math.MaxInt32 + 1
375
376	// defaultMaxUInt96 will be initialized to 0 as desired
377}
378
379func (s *Int96Statistics) defaultMin() parquet.Int96 {
380	if s.order == schema.SortUNSIGNED {
381		return defaultMinUInt96
382	}
383	return defaultMinInt96
384}
385
386func (s *Int96Statistics) defaultMax() parquet.Int96 {
387	if s.order == schema.SortUNSIGNED {
388		return defaultMaxUInt96
389	}
390	return defaultMaxInt96
391}
392
393func (Float32Statistics) defaultMin() float32                             { return math.MaxFloat32 }
394func (Float32Statistics) defaultMax() float32                             { return -math.MaxFloat32 }
395func (Float64Statistics) defaultMin() float64                             { return math.MaxFloat64 }
396func (Float64Statistics) defaultMax() float64                             { return -math.MaxFloat64 }
397func (ByteArrayStatistics) defaultMin() parquet.ByteArray                 { return nil }
398func (ByteArrayStatistics) defaultMax() parquet.ByteArray                 { return nil }
399func (FixedLenByteArrayStatistics) defaultMin() parquet.FixedLenByteArray { return nil }
400func (FixedLenByteArrayStatistics) defaultMax() parquet.FixedLenByteArray { return nil }
401
402func (BooleanStatistics) equal(a, b bool) bool                { return a == b }
403func (Int32Statistics) equal(a, b int32) bool                 { return a == b }
404func (Int64Statistics) equal(a, b int64) bool                 { return a == b }
405func (Float32Statistics) equal(a, b float32) bool             { return a == b }
406func (Float64Statistics) equal(a, b float64) bool             { return a == b }
407func (Int96Statistics) equal(a, b parquet.Int96) bool         { return bytes.Equal(a[:], b[:]) }
408func (ByteArrayStatistics) equal(a, b parquet.ByteArray) bool { return bytes.Equal(a, b) }
409func (FixedLenByteArrayStatistics) equal(a, b parquet.FixedLenByteArray) bool {
410	return bytes.Equal(a, b)
411}
412
413func (BooleanStatistics) less(a, b bool) bool {
414	return !a && b
415}
416
417func (s *Int32Statistics) less(a, b int32) bool {
418	if s.order == schema.SortUNSIGNED {
419		return uint32(a) < uint32(b)
420	}
421	return a < b
422}
423
424func (s *Int64Statistics) less(a, b int64) bool {
425	if s.order == schema.SortUNSIGNED {
426		return uint64(a) < uint64(b)
427	}
428	return a < b
429}
430func (Float32Statistics) less(a, b float32) bool { return a < b }
431func (Float64Statistics) less(a, b float64) bool { return a < b }
432func (s *Int96Statistics) less(a, b parquet.Int96) bool {
433	i96a := arrow.Uint32Traits.CastFromBytes(a[:])
434	i96b := arrow.Uint32Traits.CastFromBytes(b[:])
435
436	a0, a1, a2 := utils.ToLEUint32(i96a[0]), utils.ToLEUint32(i96a[1]), utils.ToLEUint32(i96a[2])
437	b0, b1, b2 := utils.ToLEUint32(i96b[0]), utils.ToLEUint32(i96b[1]), utils.ToLEUint32(i96b[2])
438
439	if a2 != b2 {
440		// only the msb bit is by signed comparison
441		if s.order == schema.SortSIGNED {
442			return int32(a2) < int32(b2)
443		}
444		return a2 < b2
445	} else if a1 != b1 {
446		return a1 < b1
447	}
448	return a0 < b0
449}
450
451func (s *ByteArrayStatistics) less(a, b parquet.ByteArray) bool {
452	if s.order == schema.SortUNSIGNED {
453		return bytes.Compare(a, b) == -1
454	}
455
456	return signedByteLess([]byte(a), []byte(b))
457}
458
459func (s *FixedLenByteArrayStatistics) less(a, b parquet.FixedLenByteArray) bool {
460	if s.order == schema.SortUNSIGNED {
461		return bytes.Compare(a, b) == -1
462	}
463
464	return signedByteLess([]byte(a), []byte(b))
465}
466
467func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean) *minmaxPairBoolean { return &minMax }
468func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32       { return &minMax }
469func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64       { return &minMax }
470func (Int96Statistics) cleanStat(minMax minmaxPairInt96) *minmaxPairInt96       { return &minMax }
471
472// in the case of floating point types, the following rules are applied as per parquet-mr:
473// - if any of min/max is NaN, return nothing
474// - if min is 0.0f replace with -0.0f
475// - if max is -0.0f replace with 0.0f
476//
477// https://issues.apache.org/jira/browse/PARQUET-1222 tracks the official documenting of
478// a well-defined order for floats and doubles.
479func (Float32Statistics) cleanStat(minMax minmaxPairFloat32) *minmaxPairFloat32 {
480	if math.IsNaN(float64(minMax[0])) || math.IsNaN(float64(minMax[1])) {
481		return nil
482	}
483
484	if minMax[0] == math.MaxFloat32 && minMax[1] == -math.MaxFloat32 {
485		return nil
486	}
487
488	var zero float32 = 0
489	if minMax[0] == zero && !math.Signbit(float64(minMax[0])) {
490		minMax[0] = -minMax[0]
491	}
492
493	if minMax[1] == zero && math.Signbit(float64(minMax[1])) {
494		minMax[1] = -minMax[1]
495	}
496
497	return &minMax
498}
499
500func (Float64Statistics) cleanStat(minMax minmaxPairFloat64) *minmaxPairFloat64 {
501	if math.IsNaN(minMax[0]) || math.IsNaN(minMax[1]) {
502		return nil
503	}
504
505	if minMax[0] == math.MaxFloat64 && minMax[1] == -math.MaxFloat64 {
506		return nil
507	}
508
509	var zero float64 = 0
510	if minMax[0] == zero && !math.Signbit(minMax[0]) {
511		minMax[0] = -minMax[0]
512	}
513
514	if minMax[1] == zero && math.Signbit(minMax[1]) {
515		minMax[1] = -minMax[1]
516	}
517
518	return &minMax
519}
520
521func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray) *minmaxPairByteArray {
522	if minMax[0] == nil || minMax[1] == nil {
523		return nil
524	}
525	return &minMax
526}
527
528func (FixedLenByteArrayStatistics) cleanStat(minMax minmaxPairFixedLenByteArray) *minmaxPairFixedLenByteArray {
529	if minMax[0] == nil || minMax[1] == nil {
530		return nil
531	}
532	return &minMax
533}
534
535func GetStatValue(typ parquet.Type, val []byte) interface{} {
536	switch typ {
537	case parquet.Types.Boolean:
538		return val[0] != 0
539	case parquet.Types.Int32:
540		return int32(binary.LittleEndian.Uint32(val))
541	case parquet.Types.Int64:
542		return int64(binary.LittleEndian.Uint64(val))
543	case parquet.Types.Int96:
544		p := parquet.Int96{}
545		copy(p[:], val)
546		return p
547	case parquet.Types.Float:
548		return math.Float32frombits(binary.LittleEndian.Uint32(val))
549	case parquet.Types.Double:
550		return math.Float64frombits(binary.LittleEndian.Uint64(val))
551	case parquet.Types.ByteArray:
552		fallthrough
553	case parquet.Types.FixedLenByteArray:
554		return val
555	}
556	return nil
557}
558