1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package encoding
18
19import (
20	"bytes"
21	"math"
22	"math/bits"
23	"reflect"
24
25	"github.com/apache/arrow/go/v6/arrow"
26	"github.com/apache/arrow/go/v6/arrow/memory"
27	"github.com/apache/arrow/go/v6/parquet"
28	"github.com/apache/arrow/go/v6/parquet/internal/utils"
29	"golang.org/x/xerrors"
30)
31
32// see the deltaBitPack encoder for a description of the encoding format that is
33// used for delta-bitpacking.
34type deltaBitPackDecoder struct {
35	decoder
36
37	mem memory.Allocator
38
39	usedFirst            bool
40	bitdecoder           *utils.BitReader
41	blockSize            uint64
42	currentBlockVals     uint32
43	miniBlocks           uint64
44	valsPerMini          uint32
45	currentMiniBlockVals uint32
46	minDelta             int64
47	miniBlockIdx         uint64
48
49	deltaBitWidths *memory.Buffer
50	deltaBitWidth  byte
51
52	lastVal int64
53}
54
55// returns the number of bytes read so far
56func (d *deltaBitPackDecoder) bytesRead() int64 {
57	return d.bitdecoder.CurOffset()
58}
59
60func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem }
61
62// SetData sets the bytes and the expected number of values to decode
63// into the decoder, updating the decoder and allowing it to be reused.
64func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
65	// set our data into the underlying decoder for the type
66	if err := d.decoder.SetData(nvalues, data); err != nil {
67		return err
68	}
69	// create a bit reader for our decoder's values
70	d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data))
71	d.currentBlockVals = 0
72	d.currentMiniBlockVals = 0
73	if d.deltaBitWidths == nil {
74		d.deltaBitWidths = memory.NewResizableBuffer(d.mem)
75	}
76
77	var ok bool
78	d.blockSize, ok = d.bitdecoder.GetVlqInt()
79	if !ok {
80		return xerrors.New("parquet: eof exception")
81	}
82
83	if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok {
84		return xerrors.New("parquet: eof exception")
85	}
86
87	var totalValues uint64
88	if totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
89		return xerrors.New("parquet: eof exception")
90	}
91
92	if int(totalValues) != d.nvals {
93		return xerrors.New("parquet: mismatch between number of values and count in data header")
94	}
95
96	if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
97		return xerrors.New("parquet: eof exception")
98	}
99
100	if d.miniBlocks != 0 {
101		d.valsPerMini = uint32(d.blockSize / d.miniBlocks)
102	}
103	return nil
104}
105
106// initialize a block to decode
107func (d *deltaBitPackDecoder) initBlock() error {
108	// first we grab the min delta value that we'll start from
109	var ok bool
110	if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
111		return xerrors.New("parquet: eof exception")
112	}
113
114	// ensure we have enough space for our miniblocks to decode the widths
115	d.deltaBitWidths.Resize(int(d.miniBlocks))
116
117	var err error
118	for i := uint64(0); i < d.miniBlocks; i++ {
119		if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil {
120			return err
121		}
122	}
123
124	d.miniBlockIdx = 0
125	d.deltaBitWidth = d.deltaBitWidths.Bytes()[0]
126	d.currentBlockVals = uint32(d.blockSize)
127	return nil
128}
129
130// DeltaBitPackInt32Decoder decodes Int32 values which are packed using the Delta BitPacking algorithm.
131type DeltaBitPackInt32Decoder struct {
132	*deltaBitPackDecoder
133
134	miniBlockValues []int32
135}
136
137func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
138	if d.miniBlockValues == nil {
139		d.miniBlockValues = make([]int32, 0, int(d.valsPerMini))
140	} else {
141		d.miniBlockValues = d.miniBlockValues[:0]
142	}
143	d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
144	d.currentMiniBlockVals = d.valsPerMini
145
146	for j := 0; j < int(d.valsPerMini); j++ {
147		delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
148		if !ok {
149			return xerrors.New("parquet: eof exception")
150		}
151
152		d.lastVal += int64(delta) + int64(d.minDelta)
153		d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal))
154	}
155	d.miniBlockIdx++
156	return nil
157}
158
159// Decode retrieves min(remaining values, len(out)) values from the data and returns the number
160// of values actually decoded and any errors encountered.
161func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
162	max := utils.MinInt(len(out), d.nvals)
163	if max == 0 {
164		return 0, nil
165	}
166
167	out = out[:max]
168	if !d.usedFirst { // starting value to calculate deltas against
169		out[0] = int32(d.lastVal)
170		out = out[1:]
171		d.usedFirst = true
172	}
173
174	var err error
175	for len(out) > 0 { // unpack mini blocks until we get all the values we need
176		if d.currentBlockVals == 0 {
177			err = d.initBlock()
178		}
179		if d.currentMiniBlockVals == 0 {
180			err = d.unpackNextMini()
181		}
182		if err != nil {
183			return 0, err
184		}
185
186		// copy as many values from our mini block as we can into out
187		start := int(d.valsPerMini - d.currentMiniBlockVals)
188		end := utils.MinInt(int(d.valsPerMini), len(out))
189		copy(out, d.miniBlockValues[start:end])
190
191		numCopied := end - start
192		out = out[numCopied:]
193		d.currentBlockVals -= uint32(numCopied)
194		d.currentMiniBlockVals -= uint32(numCopied)
195	}
196	return max, nil
197}
198
199// DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
200func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
201	toread := len(out) - nullCount
202	values, err := d.Decode(out[:toread])
203	if err != nil {
204		return values, err
205	}
206	if values != toread {
207		return values, xerrors.New("parquet: number of values / definition levels read did not match")
208	}
209
210	return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
211}
212
213// Type returns the physical parquet type that this decoder decodes, in this case Int32
214func (DeltaBitPackInt32Decoder) Type() parquet.Type {
215	return parquet.Types.Int32
216}
217
218// DeltaBitPackInt64Decoder decodes a delta bit packed int64 column of data.
219type DeltaBitPackInt64Decoder struct {
220	*deltaBitPackDecoder
221
222	miniBlockValues []int64
223}
224
225func (d *DeltaBitPackInt64Decoder) unpackNextMini() error {
226	if d.miniBlockValues == nil {
227		d.miniBlockValues = make([]int64, 0, int(d.valsPerMini))
228	} else {
229		d.miniBlockValues = d.miniBlockValues[:0]
230	}
231
232	d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
233	d.currentMiniBlockVals = d.valsPerMini
234
235	for j := 0; j < int(d.valsPerMini); j++ {
236		delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
237		if !ok {
238			return xerrors.New("parquet: eof exception")
239		}
240
241		d.lastVal += int64(delta) + int64(d.minDelta)
242		d.miniBlockValues = append(d.miniBlockValues, d.lastVal)
243	}
244	d.miniBlockIdx++
245	return nil
246}
247
248// Decode retrieves min(remaining values, len(out)) values from the data and returns the number
249// of values actually decoded and any errors encountered.
250func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) {
251	max := utils.MinInt(len(out), d.nvals)
252	if max == 0 {
253		return 0, nil
254	}
255
256	out = out[:max]
257	if !d.usedFirst {
258		out[0] = d.lastVal
259		out = out[1:]
260		d.usedFirst = true
261	}
262
263	var err error
264	for len(out) > 0 {
265		if d.currentBlockVals == 0 {
266			err = d.initBlock()
267		}
268		if d.currentMiniBlockVals == 0 {
269			err = d.unpackNextMini()
270		}
271
272		if err != nil {
273			return 0, err
274		}
275
276		start := int(d.valsPerMini - d.currentMiniBlockVals)
277		end := utils.MinInt(int(d.valsPerMini), len(out))
278		copy(out, d.miniBlockValues[start:end])
279
280		numCopied := end - start
281		out = out[numCopied:]
282		d.currentBlockVals -= uint32(numCopied)
283		d.currentMiniBlockVals -= uint32(numCopied)
284	}
285	return max, nil
286}
287
288// Type returns the physical parquet type that this decoder decodes, in this case Int64
289func (DeltaBitPackInt64Decoder) Type() parquet.Type {
290	return parquet.Types.Int64
291}
292
293// DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
294func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
295	toread := len(out) - nullCount
296	values, err := d.Decode(out[:toread])
297	if err != nil {
298		return values, err
299	}
300	if values != toread {
301		return values, xerrors.New("parquet: number of values / definition levels read did not match")
302	}
303
304	return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
305}
306
307const (
308	// block size must be a multiple of 128
309	defaultBlockSize     = 128
310	defaultNumMiniBlocks = 4
311	// block size / number of mini blocks must result in a multiple of 32
312	defaultNumValuesPerMini = 32
313	// max size of the header for the delta blocks
314	maxHeaderWriterSize = 32
315)
316
317// deltaBitPackEncoder is an encoder for the DeltaBinary Packing format
318// as per the parquet spec.
319//
320// Consists of a header followed by blocks of delta encoded values binary packed.
321//
322//	Format
323// 		[header] [block 1] [block 2] ... [block N]
324//
325//	Header
326//		[block size] [number of mini blocks per block] [total value count] [first value]
327//
328//	Block
329//		[min delta] [list of bitwidths of the miniblocks] [miniblocks...]
330//
331// Sets aside bytes at the start of the internal buffer where the header will be written,
332// and only writes the header when FlushValues is called before returning it.
333type deltaBitPackEncoder struct {
334	encoder
335
336	bitWriter  *utils.BitWriter
337	totalVals  uint64
338	firstVal   int64
339	currentVal int64
340
341	blockSize     uint64
342	miniBlockSize uint64
343	numMiniBlocks uint64
344	deltas        []int64
345}
346
347// flushBlock flushes out a finished block for writing to the underlying encoder
348func (enc *deltaBitPackEncoder) flushBlock() {
349	if len(enc.deltas) == 0 {
350		return
351	}
352
353	// determine the minimum delta value
354	minDelta := int64(math.MaxInt64)
355	for _, delta := range enc.deltas {
356		if delta < minDelta {
357			minDelta = delta
358		}
359	}
360
361	enc.bitWriter.WriteZigZagVlqInt(minDelta)
362	// reserve enough bytes to write out our miniblock deltas
363	offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks))
364
365	valuesToWrite := int64(len(enc.deltas))
366	for i := 0; i < int(enc.numMiniBlocks); i++ {
367		n := utils.Min(int64(enc.miniBlockSize), valuesToWrite)
368		if n == 0 {
369			break
370		}
371
372		maxDelta := int64(math.MinInt64)
373		start := i * int(enc.miniBlockSize)
374		for _, val := range enc.deltas[start : start+int(n)] {
375			maxDelta = utils.Max(maxDelta, val)
376		}
377
378		// compute bit width to store (max_delta - min_delta)
379		width := uint(bits.Len64(uint64(maxDelta - minDelta)))
380		// write out the bit width we used into the bytes we reserved earlier
381		enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i))
382
383		// write out our deltas
384		for _, val := range enc.deltas[start : start+int(n)] {
385			enc.bitWriter.WriteValue(uint64(val-minDelta), width)
386		}
387
388		valuesToWrite -= n
389
390		// pad the last block if n < miniBlockSize
391		for ; n < int64(enc.miniBlockSize); n++ {
392			enc.bitWriter.WriteValue(0, width)
393		}
394	}
395	enc.deltas = enc.deltas[:0]
396}
397
398// putInternal is the implementation for actually writing data which must be
399// integral data as int, int8, int32, or int64.
400func (enc *deltaBitPackEncoder) putInternal(data interface{}) {
401	v := reflect.ValueOf(data)
402	if v.Len() == 0 {
403		return
404	}
405
406	idx := 0
407	if enc.totalVals == 0 {
408		enc.blockSize = defaultBlockSize
409		enc.numMiniBlocks = defaultNumMiniBlocks
410		enc.miniBlockSize = defaultNumValuesPerMini
411
412		enc.firstVal = v.Index(0).Int()
413		enc.currentVal = enc.firstVal
414		idx = 1
415
416		enc.bitWriter = utils.NewBitWriter(enc.sink)
417	}
418
419	enc.totalVals += uint64(v.Len())
420	for ; idx < v.Len(); idx++ {
421		val := v.Index(idx).Int()
422		enc.deltas = append(enc.deltas, val-enc.currentVal)
423		enc.currentVal = val
424		if len(enc.deltas) == int(enc.blockSize) {
425			enc.flushBlock()
426		}
427	}
428}
429
430// FlushValues flushes any remaining data and returns the finished encoded buffer
431// or returns nil and any error encountered during flushing.
432func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) {
433	if enc.bitWriter != nil {
434		// write any remaining values
435		enc.flushBlock()
436		enc.bitWriter.Flush(true)
437	} else {
438		enc.blockSize = defaultBlockSize
439		enc.numMiniBlocks = defaultNumMiniBlocks
440		enc.miniBlockSize = defaultNumValuesPerMini
441	}
442
443	buffer := make([]byte, maxHeaderWriterSize)
444	headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer))
445
446	headerWriter.WriteVlqInt(uint64(enc.blockSize))
447	headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks))
448	headerWriter.WriteVlqInt(uint64(enc.totalVals))
449	headerWriter.WriteZigZagVlqInt(int64(enc.firstVal))
450	headerWriter.Flush(false)
451
452	buffer = buffer[:headerWriter.Written()]
453	enc.totalVals = 0
454
455	if enc.bitWriter != nil {
456		flushed := enc.sink.Finish()
457		defer flushed.Release()
458
459		buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...)
460	}
461	return poolBuffer{memory.NewBufferBytes(buffer)}, nil
462}
463
464// EstimatedDataEncodedSize returns the current amount of data actually flushed out and written
465func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 {
466	return int64(enc.bitWriter.Written())
467}
468
469// DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
470type DeltaBitPackInt32Encoder struct {
471	*deltaBitPackEncoder
472}
473
474// Put writes the values from the provided slice of int32 to the encoder
475func (enc DeltaBitPackInt32Encoder) Put(in []int32) {
476	enc.putInternal(in)
477}
478
479// PutSpaced takes a slice of int32 along with a bitmap that describes the nulls and an offset into the bitmap
480// in order to write spaced data to the encoder.
481func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) {
482	buffer := memory.NewResizableBuffer(enc.mem)
483	buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in)))
484	defer buffer.Release()
485
486	data := arrow.Int32Traits.CastFromBytes(buffer.Buf())
487	nvalid := spacedCompress(in, data, validBits, validBitsOffset)
488	enc.Put(data[:nvalid])
489}
490
491// Type returns the underlying physical type this encoder works with, in this case Int32
492func (DeltaBitPackInt32Encoder) Type() parquet.Type {
493	return parquet.Types.Int32
494}
495
496// DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
497type DeltaBitPackInt64Encoder struct {
498	*deltaBitPackEncoder
499}
500
501// Put writes the values from the provided slice of int64 to the encoder
502func (enc DeltaBitPackInt64Encoder) Put(in []int64) {
503	enc.putInternal(in)
504}
505
506// PutSpaced takes a slice of int64 along with a bitmap that describes the nulls and an offset into the bitmap
507// in order to write spaced data to the encoder.
508func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) {
509	buffer := memory.NewResizableBuffer(enc.mem)
510	buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in)))
511	defer buffer.Release()
512
513	data := arrow.Int64Traits.CastFromBytes(buffer.Buf())
514	nvalid := spacedCompress(in, data, validBits, validBitsOffset)
515	enc.Put(data[:nvalid])
516}
517
518// Type returns the underlying physical type this encoder works with, in this case Int64
519func (DeltaBitPackInt64Encoder) Type() parquet.Type {
520	return parquet.Types.Int64
521}
522