1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package encoding
18
19import (
20	"bytes"
21	"reflect"
22
23	"github.com/apache/arrow/go/v6/arrow/memory"
24	"github.com/apache/arrow/go/v6/parquet"
25	"github.com/apache/arrow/go/v6/parquet/internal/debug"
26	format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
27	"github.com/apache/arrow/go/v6/parquet/internal/utils"
28	"github.com/apache/arrow/go/v6/parquet/schema"
29	"golang.org/x/xerrors"
30)
31
32// DecoderTraits provides an interface for more easily interacting with types
33// to generate decoders for specific types.
34type DecoderTraits interface {
35	Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder
36	BytesRequired(int) int
37}
38
39// NewDecoder constructs a decoder for a given type and encoding
40func NewDecoder(t parquet.Type, e parquet.Encoding, descr *schema.Column, mem memory.Allocator) TypedDecoder {
41	traits := getDecodingTraits(t)
42	if traits == nil {
43		return nil
44	}
45
46	return traits.Decoder(e, descr, false /* use dictionary */, mem)
47}
48
49// NewDictDecoder is like NewDecoder but for dictionary encodings, panics if type is bool.
50//
51// if mem is nil, memory.DefaultAllocator will be used
52func NewDictDecoder(t parquet.Type, descr *schema.Column, mem memory.Allocator) DictDecoder {
53	traits := getDecodingTraits(t)
54	if traits == nil {
55		return nil
56	}
57
58	if mem == nil {
59		mem = memory.DefaultAllocator
60	}
61
62	return traits.Decoder(parquet.Encodings.RLEDict, descr, true /* use dictionary */, mem).(DictDecoder)
63}
64
65type decoder struct {
66	descr    *schema.Column
67	encoding format.Encoding
68	nvals    int
69	data     []byte
70	typeLen  int
71}
72
73// newDecoderBase constructs the base decoding object that is embedded in the
74// type specific decoders.
75func newDecoderBase(e format.Encoding, descr *schema.Column) decoder {
76	typeLen := -1
77	if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray {
78		typeLen = int(descr.TypeLength())
79	}
80
81	return decoder{
82		descr:    descr,
83		encoding: e,
84		typeLen:  typeLen,
85	}
86}
87
88// SetData sets the data for decoding into the decoder to update the available
89// data bytes and number of values available.
90func (d *decoder) SetData(nvals int, data []byte) error {
91	d.data = data
92	d.nvals = nvals
93	return nil
94}
95
96// ValuesLeft returns the number of remaining values that can be decoded
97func (d *decoder) ValuesLeft() int { return d.nvals }
98
99// Encoding returns the encoding type used by this decoder to decode the bytes.
100func (d *decoder) Encoding() parquet.Encoding { return parquet.Encoding(d.encoding) }
101
102type dictDecoder struct {
103	decoder
104	mem              memory.Allocator
105	dictValueDecoder utils.DictionaryConverter
106	idxDecoder       *utils.RleDecoder
107}
108
109// SetDict sets a decoder that can be used to decode the dictionary that is
110// used for this column in order to return the proper values.
111func (d *dictDecoder) SetDict(dict TypedDecoder) {
112	if dict.Type() != d.descr.PhysicalType() {
113		panic("parquet: mismatch dictionary and column data type")
114	}
115
116	d.dictValueDecoder = NewDictConverter(dict)
117}
118
119// SetData sets the index value data into the decoder.
120func (d *dictDecoder) SetData(nvals int, data []byte) error {
121	d.nvals = nvals
122	if len(data) == 0 {
123		// no data, bitwidth can safely be 0
124		d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 /* bitwidth */)
125		return nil
126	}
127
128	// grab the bit width from the first byte
129	width := uint8(data[0])
130	if width >= 64 {
131		return xerrors.New("parquet: invalid or corrupted bit width")
132	}
133
134	// pass the rest of the data, minus that first byte, to the decoder
135	d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), int(width))
136	return nil
137}
138
139func (d *dictDecoder) decode(out interface{}) (int, error) {
140	return d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out)
141}
142
143func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
144	return d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, nullCount, validBits, validBitsOffset)
145}
146
147var empty = [1]byte{0}
148
149// spacedExpand is used to take a slice of data and utilize the bitmap provided to fill in nulls into the
150// correct slots according to the bitmap in order to produce a fully expanded result slice with nulls
151// in the correct slots.
152func spacedExpand(buffer interface{}, nullCount int, validBits []byte, validBitsOffset int64) int {
153	bufferRef := reflect.ValueOf(buffer)
154	if bufferRef.Kind() != reflect.Slice {
155		panic("invalid spacedexpand type, not slice")
156	}
157
158	var (
159		numValues int = bufferRef.Len()
160	)
161
162	idxDecode := int64(numValues - nullCount)
163	if idxDecode == 0 { // if there's nothing to decode there's nothing to do.
164		return numValues
165	}
166
167	// read the bitmap in reverse grabbing runs of valid bits where possible.
168	rdr := utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(numValues))
169	for {
170		run := rdr.NextRun()
171		if run.Length == 0 {
172			break
173		}
174
175		// copy data from the end of the slice to it's proper location in the slice after accounting for the nulls
176		// because we technically don't care what is in the null slots we don't actually have to clean
177		// up after ourselves because we're doing this in reverse to guarantee that we'll always simply
178		// overwrite any existing data with the correctly spaced data. Any data that happens to be left in the null
179		// slots is fine since it shouldn't matter and saves us work.
180		idxDecode -= run.Length
181		n := reflect.Copy(bufferRef.Slice(int(run.Pos), bufferRef.Len()), bufferRef.Slice(int(idxDecode), int(int64(idxDecode)+run.Length)))
182		debug.Assert(n == int(run.Length), "reflect.Copy copied incorrect number of elements in spacedExpand")
183	}
184
185	return numValues
186}
187