1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package encoding 18 19import ( 20 "bytes" 21 "math" 22 "math/bits" 23 "reflect" 24 25 "github.com/apache/arrow/go/v6/arrow" 26 "github.com/apache/arrow/go/v6/arrow/memory" 27 "github.com/apache/arrow/go/v6/parquet" 28 "github.com/apache/arrow/go/v6/parquet/internal/utils" 29 "golang.org/x/xerrors" 30) 31 32// see the deltaBitPack encoder for a description of the encoding format that is 33// used for delta-bitpacking. 34type deltaBitPackDecoder struct { 35 decoder 36 37 mem memory.Allocator 38 39 usedFirst bool 40 bitdecoder *utils.BitReader 41 blockSize uint64 42 currentBlockVals uint32 43 miniBlocks uint64 44 valsPerMini uint32 45 currentMiniBlockVals uint32 46 minDelta int64 47 miniBlockIdx uint64 48 49 deltaBitWidths *memory.Buffer 50 deltaBitWidth byte 51 52 lastVal int64 53} 54 55// returns the number of bytes read so far 56func (d *deltaBitPackDecoder) bytesRead() int64 { 57 return d.bitdecoder.CurOffset() 58} 59 60func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem } 61 62// SetData sets the bytes and the expected number of values to decode 63// into the decoder, updating the decoder and allowing it to be reused. 64func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error { 65 // set our data into the underlying decoder for the type 66 if err := d.decoder.SetData(nvalues, data); err != nil { 67 return err 68 } 69 // create a bit reader for our decoder's values 70 d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data)) 71 d.currentBlockVals = 0 72 d.currentMiniBlockVals = 0 73 if d.deltaBitWidths == nil { 74 d.deltaBitWidths = memory.NewResizableBuffer(d.mem) 75 } 76 77 var ok bool 78 d.blockSize, ok = d.bitdecoder.GetVlqInt() 79 if !ok { 80 return xerrors.New("parquet: eof exception") 81 } 82 83 if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok { 84 return xerrors.New("parquet: eof exception") 85 } 86 87 var totalValues uint64 88 if totalValues, ok = d.bitdecoder.GetVlqInt(); !ok { 89 return xerrors.New("parquet: eof exception") 90 } 91 92 if int(totalValues) != d.nvals { 93 return xerrors.New("parquet: mismatch between number of values and count in data header") 94 } 95 96 if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok { 97 return xerrors.New("parquet: eof exception") 98 } 99 100 if d.miniBlocks != 0 { 101 d.valsPerMini = uint32(d.blockSize / d.miniBlocks) 102 } 103 return nil 104} 105 106// initialize a block to decode 107func (d *deltaBitPackDecoder) initBlock() error { 108 // first we grab the min delta value that we'll start from 109 var ok bool 110 if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok { 111 return xerrors.New("parquet: eof exception") 112 } 113 114 // ensure we have enough space for our miniblocks to decode the widths 115 d.deltaBitWidths.Resize(int(d.miniBlocks)) 116 117 var err error 118 for i := uint64(0); i < d.miniBlocks; i++ { 119 if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil { 120 return err 121 } 122 } 123 124 d.miniBlockIdx = 0 125 d.deltaBitWidth = d.deltaBitWidths.Bytes()[0] 126 d.currentBlockVals = uint32(d.blockSize) 127 return nil 128} 129 130// DeltaBitPackInt32Decoder decodes Int32 values which are packed using the Delta BitPacking algorithm. 131type DeltaBitPackInt32Decoder struct { 132 *deltaBitPackDecoder 133 134 miniBlockValues []int32 135} 136 137func (d *DeltaBitPackInt32Decoder) unpackNextMini() error { 138 if d.miniBlockValues == nil { 139 d.miniBlockValues = make([]int32, 0, int(d.valsPerMini)) 140 } else { 141 d.miniBlockValues = d.miniBlockValues[:0] 142 } 143 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)] 144 d.currentMiniBlockVals = d.valsPerMini 145 146 for j := 0; j < int(d.valsPerMini); j++ { 147 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth)) 148 if !ok { 149 return xerrors.New("parquet: eof exception") 150 } 151 152 d.lastVal += int64(delta) + int64(d.minDelta) 153 d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal)) 154 } 155 d.miniBlockIdx++ 156 return nil 157} 158 159// Decode retrieves min(remaining values, len(out)) values from the data and returns the number 160// of values actually decoded and any errors encountered. 161func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) { 162 max := utils.MinInt(len(out), d.nvals) 163 if max == 0 { 164 return 0, nil 165 } 166 167 out = out[:max] 168 if !d.usedFirst { // starting value to calculate deltas against 169 out[0] = int32(d.lastVal) 170 out = out[1:] 171 d.usedFirst = true 172 } 173 174 var err error 175 for len(out) > 0 { // unpack mini blocks until we get all the values we need 176 if d.currentBlockVals == 0 { 177 err = d.initBlock() 178 } 179 if d.currentMiniBlockVals == 0 { 180 err = d.unpackNextMini() 181 } 182 if err != nil { 183 return 0, err 184 } 185 186 // copy as many values from our mini block as we can into out 187 start := int(d.valsPerMini - d.currentMiniBlockVals) 188 end := utils.MinInt(int(d.valsPerMini), len(out)) 189 copy(out, d.miniBlockValues[start:end]) 190 191 numCopied := end - start 192 out = out[numCopied:] 193 d.currentBlockVals -= uint32(numCopied) 194 d.currentMiniBlockVals -= uint32(numCopied) 195 } 196 return max, nil 197} 198 199// DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap 200func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { 201 toread := len(out) - nullCount 202 values, err := d.Decode(out[:toread]) 203 if err != nil { 204 return values, err 205 } 206 if values != toread { 207 return values, xerrors.New("parquet: number of values / definition levels read did not match") 208 } 209 210 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil 211} 212 213// Type returns the physical parquet type that this decoder decodes, in this case Int32 214func (DeltaBitPackInt32Decoder) Type() parquet.Type { 215 return parquet.Types.Int32 216} 217 218// DeltaBitPackInt64Decoder decodes a delta bit packed int64 column of data. 219type DeltaBitPackInt64Decoder struct { 220 *deltaBitPackDecoder 221 222 miniBlockValues []int64 223} 224 225func (d *DeltaBitPackInt64Decoder) unpackNextMini() error { 226 if d.miniBlockValues == nil { 227 d.miniBlockValues = make([]int64, 0, int(d.valsPerMini)) 228 } else { 229 d.miniBlockValues = d.miniBlockValues[:0] 230 } 231 232 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)] 233 d.currentMiniBlockVals = d.valsPerMini 234 235 for j := 0; j < int(d.valsPerMini); j++ { 236 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth)) 237 if !ok { 238 return xerrors.New("parquet: eof exception") 239 } 240 241 d.lastVal += int64(delta) + int64(d.minDelta) 242 d.miniBlockValues = append(d.miniBlockValues, d.lastVal) 243 } 244 d.miniBlockIdx++ 245 return nil 246} 247 248// Decode retrieves min(remaining values, len(out)) values from the data and returns the number 249// of values actually decoded and any errors encountered. 250func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) { 251 max := utils.MinInt(len(out), d.nvals) 252 if max == 0 { 253 return 0, nil 254 } 255 256 out = out[:max] 257 if !d.usedFirst { 258 out[0] = d.lastVal 259 out = out[1:] 260 d.usedFirst = true 261 } 262 263 var err error 264 for len(out) > 0 { 265 if d.currentBlockVals == 0 { 266 err = d.initBlock() 267 } 268 if d.currentMiniBlockVals == 0 { 269 err = d.unpackNextMini() 270 } 271 272 if err != nil { 273 return 0, err 274 } 275 276 start := int(d.valsPerMini - d.currentMiniBlockVals) 277 end := utils.MinInt(int(d.valsPerMini), len(out)) 278 copy(out, d.miniBlockValues[start:end]) 279 280 numCopied := end - start 281 out = out[numCopied:] 282 d.currentBlockVals -= uint32(numCopied) 283 d.currentMiniBlockVals -= uint32(numCopied) 284 } 285 return max, nil 286} 287 288// Type returns the physical parquet type that this decoder decodes, in this case Int64 289func (DeltaBitPackInt64Decoder) Type() parquet.Type { 290 return parquet.Types.Int64 291} 292 293// DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap 294func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { 295 toread := len(out) - nullCount 296 values, err := d.Decode(out[:toread]) 297 if err != nil { 298 return values, err 299 } 300 if values != toread { 301 return values, xerrors.New("parquet: number of values / definition levels read did not match") 302 } 303 304 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil 305} 306 307const ( 308 // block size must be a multiple of 128 309 defaultBlockSize = 128 310 defaultNumMiniBlocks = 4 311 // block size / number of mini blocks must result in a multiple of 32 312 defaultNumValuesPerMini = 32 313 // max size of the header for the delta blocks 314 maxHeaderWriterSize = 32 315) 316 317// deltaBitPackEncoder is an encoder for the DeltaBinary Packing format 318// as per the parquet spec. 319// 320// Consists of a header followed by blocks of delta encoded values binary packed. 321// 322// Format 323// [header] [block 1] [block 2] ... [block N] 324// 325// Header 326// [block size] [number of mini blocks per block] [total value count] [first value] 327// 328// Block 329// [min delta] [list of bitwidths of the miniblocks] [miniblocks...] 330// 331// Sets aside bytes at the start of the internal buffer where the header will be written, 332// and only writes the header when FlushValues is called before returning it. 333type deltaBitPackEncoder struct { 334 encoder 335 336 bitWriter *utils.BitWriter 337 totalVals uint64 338 firstVal int64 339 currentVal int64 340 341 blockSize uint64 342 miniBlockSize uint64 343 numMiniBlocks uint64 344 deltas []int64 345} 346 347// flushBlock flushes out a finished block for writing to the underlying encoder 348func (enc *deltaBitPackEncoder) flushBlock() { 349 if len(enc.deltas) == 0 { 350 return 351 } 352 353 // determine the minimum delta value 354 minDelta := int64(math.MaxInt64) 355 for _, delta := range enc.deltas { 356 if delta < minDelta { 357 minDelta = delta 358 } 359 } 360 361 enc.bitWriter.WriteZigZagVlqInt(minDelta) 362 // reserve enough bytes to write out our miniblock deltas 363 offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks)) 364 365 valuesToWrite := int64(len(enc.deltas)) 366 for i := 0; i < int(enc.numMiniBlocks); i++ { 367 n := utils.Min(int64(enc.miniBlockSize), valuesToWrite) 368 if n == 0 { 369 break 370 } 371 372 maxDelta := int64(math.MinInt64) 373 start := i * int(enc.miniBlockSize) 374 for _, val := range enc.deltas[start : start+int(n)] { 375 maxDelta = utils.Max(maxDelta, val) 376 } 377 378 // compute bit width to store (max_delta - min_delta) 379 width := uint(bits.Len64(uint64(maxDelta - minDelta))) 380 // write out the bit width we used into the bytes we reserved earlier 381 enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i)) 382 383 // write out our deltas 384 for _, val := range enc.deltas[start : start+int(n)] { 385 enc.bitWriter.WriteValue(uint64(val-minDelta), width) 386 } 387 388 valuesToWrite -= n 389 390 // pad the last block if n < miniBlockSize 391 for ; n < int64(enc.miniBlockSize); n++ { 392 enc.bitWriter.WriteValue(0, width) 393 } 394 } 395 enc.deltas = enc.deltas[:0] 396} 397 398// putInternal is the implementation for actually writing data which must be 399// integral data as int, int8, int32, or int64. 400func (enc *deltaBitPackEncoder) putInternal(data interface{}) { 401 v := reflect.ValueOf(data) 402 if v.Len() == 0 { 403 return 404 } 405 406 idx := 0 407 if enc.totalVals == 0 { 408 enc.blockSize = defaultBlockSize 409 enc.numMiniBlocks = defaultNumMiniBlocks 410 enc.miniBlockSize = defaultNumValuesPerMini 411 412 enc.firstVal = v.Index(0).Int() 413 enc.currentVal = enc.firstVal 414 idx = 1 415 416 enc.bitWriter = utils.NewBitWriter(enc.sink) 417 } 418 419 enc.totalVals += uint64(v.Len()) 420 for ; idx < v.Len(); idx++ { 421 val := v.Index(idx).Int() 422 enc.deltas = append(enc.deltas, val-enc.currentVal) 423 enc.currentVal = val 424 if len(enc.deltas) == int(enc.blockSize) { 425 enc.flushBlock() 426 } 427 } 428} 429 430// FlushValues flushes any remaining data and returns the finished encoded buffer 431// or returns nil and any error encountered during flushing. 432func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) { 433 if enc.bitWriter != nil { 434 // write any remaining values 435 enc.flushBlock() 436 enc.bitWriter.Flush(true) 437 } else { 438 enc.blockSize = defaultBlockSize 439 enc.numMiniBlocks = defaultNumMiniBlocks 440 enc.miniBlockSize = defaultNumValuesPerMini 441 } 442 443 buffer := make([]byte, maxHeaderWriterSize) 444 headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer)) 445 446 headerWriter.WriteVlqInt(uint64(enc.blockSize)) 447 headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks)) 448 headerWriter.WriteVlqInt(uint64(enc.totalVals)) 449 headerWriter.WriteZigZagVlqInt(int64(enc.firstVal)) 450 headerWriter.Flush(false) 451 452 buffer = buffer[:headerWriter.Written()] 453 enc.totalVals = 0 454 455 if enc.bitWriter != nil { 456 flushed := enc.sink.Finish() 457 defer flushed.Release() 458 459 buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...) 460 } 461 return poolBuffer{memory.NewBufferBytes(buffer)}, nil 462} 463 464// EstimatedDataEncodedSize returns the current amount of data actually flushed out and written 465func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 { 466 return int64(enc.bitWriter.Written()) 467} 468 469// DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data. 470type DeltaBitPackInt32Encoder struct { 471 *deltaBitPackEncoder 472} 473 474// Put writes the values from the provided slice of int32 to the encoder 475func (enc DeltaBitPackInt32Encoder) Put(in []int32) { 476 enc.putInternal(in) 477} 478 479// PutSpaced takes a slice of int32 along with a bitmap that describes the nulls and an offset into the bitmap 480// in order to write spaced data to the encoder. 481func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) { 482 buffer := memory.NewResizableBuffer(enc.mem) 483 buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in))) 484 defer buffer.Release() 485 486 data := arrow.Int32Traits.CastFromBytes(buffer.Buf()) 487 nvalid := spacedCompress(in, data, validBits, validBitsOffset) 488 enc.Put(data[:nvalid]) 489} 490 491// Type returns the underlying physical type this encoder works with, in this case Int32 492func (DeltaBitPackInt32Encoder) Type() parquet.Type { 493 return parquet.Types.Int32 494} 495 496// DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data. 497type DeltaBitPackInt64Encoder struct { 498 *deltaBitPackEncoder 499} 500 501// Put writes the values from the provided slice of int64 to the encoder 502func (enc DeltaBitPackInt64Encoder) Put(in []int64) { 503 enc.putInternal(in) 504} 505 506// PutSpaced takes a slice of int64 along with a bitmap that describes the nulls and an offset into the bitmap 507// in order to write spaced data to the encoder. 508func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) { 509 buffer := memory.NewResizableBuffer(enc.mem) 510 buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in))) 511 defer buffer.Release() 512 513 data := arrow.Int64Traits.CastFromBytes(buffer.Buf()) 514 nvalid := spacedCompress(in, data, validBits, validBitsOffset) 515 enc.Put(data[:nvalid]) 516} 517 518// Type returns the underlying physical type this encoder works with, in this case Int64 519func (DeltaBitPackInt64Encoder) Type() parquet.Type { 520 return parquet.Types.Int64 521} 522