1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Imported from Apache Impala (incubating) on 2016-01-29 and modified for use
19 // in parquet-cpp, Arrow
20 
21 #pragma once
22 
23 #include <algorithm>
24 #include <cmath>
25 #include <limits>
26 #include <vector>
27 
28 #include "arrow/util/bit_block_counter.h"
29 #include "arrow/util/bit_run_reader.h"
30 #include "arrow/util/bit_stream_utils.h"
31 #include "arrow/util/bit_util.h"
32 #include "arrow/util/macros.h"
33 
34 namespace arrow {
35 namespace util {
36 
37 /// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
38 /// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
39 /// (literal encoding).
40 /// For both types of runs, there is a byte-aligned indicator which encodes the length
41 /// of the run and the type of the run.
42 /// This encoding has the benefit that when there aren't any long enough runs, values
43 /// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
44 /// the run length are byte aligned. This allows for very efficient decoding
45 /// implementations.
46 /// The encoding is:
47 ///    encoded-block := run*
48 ///    run := literal-run | repeated-run
49 ///    literal-run := literal-indicator < literal bytes >
50 ///    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
51 ///    literal-indicator := varint_encode( number_of_groups << 1 | 1)
52 ///    repeated-indicator := varint_encode( number_of_repetitions << 1 )
53 //
54 /// Each run is preceded by a varint. The varint's least significant bit is
55 /// used to indicate whether the run is a literal run or a repeated run. The rest
56 /// of the varint is used to determine the length of the run (eg how many times the
57 /// value repeats).
58 //
59 /// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
60 /// in groups of 8), so that no matter the bit-width of the value, the sequence will end
61 /// on a byte boundary without padding.
62 /// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
63 /// the actual number of encoded ints. (This means that the total number of encoded values
64 /// can not be determined from the encoded data, since the number of values in the last
65 /// group may not be a multiple of 8). For the last group of literal runs, we pad
66 /// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
67 /// without the need for additional checks.
68 //
69 /// There is a break-even point when it is more storage efficient to do run length
70 /// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
71 /// for both the repeated encoding or the literal encoding.  This value can always
72 /// be computed based on the bit-width.
73 /// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
74 //
75 /// Examples with bit-width 1 (eg encoding booleans):
76 /// ----------------------------------------
77 /// 100 1s followed by 100 0s:
78 /// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
79 ///  - (total 4 bytes)
80 //
81 /// alternating 1s and 0s (200 total):
82 /// 200 ints = 25 groups of 8
83 /// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
84 /// (total 26 bytes, 1 byte overhead)
85 //
86 
87 /// Decoder class for RLE encoded data.
88 class RleDecoder {
89  public:
90   /// Create a decoder object. buffer/buffer_len is the decoded data.
91   /// bit_width is the width of each value (before encoding).
RleDecoder(const uint8_t * buffer,int buffer_len,int bit_width)92   RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
93       : bit_reader_(buffer, buffer_len),
94         bit_width_(bit_width),
95         current_value_(0),
96         repeat_count_(0),
97         literal_count_(0) {
98     DCHECK_GE(bit_width_, 0);
99     DCHECK_LE(bit_width_, 64);
100   }
101 
RleDecoder()102   RleDecoder() : bit_width_(-1) {}
103 
Reset(const uint8_t * buffer,int buffer_len,int bit_width)104   void Reset(const uint8_t* buffer, int buffer_len, int bit_width) {
105     DCHECK_GE(bit_width, 0);
106     DCHECK_LE(bit_width, 64);
107     bit_reader_.Reset(buffer, buffer_len);
108     bit_width_ = bit_width;
109     current_value_ = 0;
110     repeat_count_ = 0;
111     literal_count_ = 0;
112   }
113 
114   /// Gets the next value.  Returns false if there are no more.
115   template <typename T>
116   bool Get(T* val);
117 
118   /// Gets a batch of values.  Returns the number of decoded elements.
119   template <typename T>
120   int GetBatch(T* values, int batch_size);
121 
122   /// Like GetBatch but add spacing for null entries
123   template <typename T>
124   int GetBatchSpaced(int batch_size, int null_count, const uint8_t* valid_bits,
125                      int64_t valid_bits_offset, T* out);
126 
127   /// Like GetBatch but the values are then decoded using the provided dictionary
128   template <typename T>
129   int GetBatchWithDict(const T* dictionary, int32_t dictionary_length, T* values,
130                        int batch_size);
131 
132   /// Like GetBatchWithDict but add spacing for null entries
133   ///
134   /// Null entries will be zero-initialized in `values` to avoid leaking
135   /// private data.
136   template <typename T>
137   int GetBatchWithDictSpaced(const T* dictionary, int32_t dictionary_length, T* values,
138                              int batch_size, int null_count, const uint8_t* valid_bits,
139                              int64_t valid_bits_offset);
140 
141  protected:
142   BitUtil::BitReader bit_reader_;
143   /// Number of bits needed to encode the value. Must be between 0 and 64.
144   int bit_width_;
145   uint64_t current_value_;
146   int32_t repeat_count_;
147   int32_t literal_count_;
148 
149  private:
150   /// Fills literal_count_ and repeat_count_ with next values. Returns false if there
151   /// are no more.
152   template <typename T>
153   bool NextCounts();
154 
155   /// Utility methods for retrieving spaced values.
156   template <typename T, typename RunType, typename Converter>
157   int GetSpaced(Converter converter, int batch_size, int null_count,
158                 const uint8_t* valid_bits, int64_t valid_bits_offset, T* out);
159 };
160 
161 /// Class to incrementally build the rle data.   This class does not allocate any memory.
162 /// The encoding has two modes: encoding repeated runs and literal runs.
163 /// If the run is sufficiently short, it is more efficient to encode as a literal run.
164 /// This class does so by buffering 8 values at a time.  If they are not all the same
165 /// they are added to the literal run.  If they are the same, they are added to the
166 /// repeated run.  When we switch modes, the previous run is flushed out.
167 class RleEncoder {
168  public:
169   /// buffer/buffer_len: preallocated output buffer.
170   /// bit_width: max number of bits for value.
171   /// TODO: consider adding a min_repeated_run_length so the caller can control
172   /// when values should be encoded as repeated runs.  Currently this is derived
173   /// based on the bit_width, which can determine a storage optimal choice.
174   /// TODO: allow 0 bit_width (and have dict encoder use it)
RleEncoder(uint8_t * buffer,int buffer_len,int bit_width)175   RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
176       : bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
177     DCHECK_GE(bit_width_, 0);
178     DCHECK_LE(bit_width_, 64);
179     max_run_byte_size_ = MinBufferSize(bit_width);
180     DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough.";
181     Clear();
182   }
183 
184   /// Returns the minimum buffer size needed to use the encoder for 'bit_width'
185   /// This is the maximum length of a single run for 'bit_width'.
186   /// It is not valid to pass a buffer less than this length.
MinBufferSize(int bit_width)187   static int MinBufferSize(int bit_width) {
188     /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
189     int max_literal_run_size =
190         1 +
191         static_cast<int>(BitUtil::BytesForBits(MAX_VALUES_PER_LITERAL_RUN * bit_width));
192     /// Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
193     int max_repeated_run_size = BitUtil::BitReader::kMaxVlqByteLength +
194                                 static_cast<int>(BitUtil::BytesForBits(bit_width));
195     return std::max(max_literal_run_size, max_repeated_run_size);
196   }
197 
198   /// Returns the maximum byte size it could take to encode 'num_values'.
MaxBufferSize(int bit_width,int num_values)199   static int MaxBufferSize(int bit_width, int num_values) {
200     // For a bit_width > 1, the worst case is the repetition of "literal run of length 8
201     // and then a repeated run of length 8".
202     // 8 values per smallest run, 8 bits per byte
203     int bytes_per_run = bit_width;
204     int num_runs = static_cast<int>(BitUtil::CeilDiv(num_values, 8));
205     int literal_max_size = num_runs + num_runs * bytes_per_run;
206 
207     // In the very worst case scenario, the data is a concatenation of repeated
208     // runs of 8 values. Repeated run has a 1 byte varint followed by the
209     // bit-packed repeated value
210     int min_repeated_run_size = 1 + static_cast<int>(BitUtil::BytesForBits(bit_width));
211     int repeated_max_size =
212         static_cast<int>(BitUtil::CeilDiv(num_values, 8)) * min_repeated_run_size;
213 
214     return std::max(literal_max_size, repeated_max_size);
215   }
216 
217   /// Encode value.  Returns true if the value fits in buffer, false otherwise.
218   /// This value must be representable with bit_width_ bits.
219   bool Put(uint64_t value);
220 
221   /// Flushes any pending values to the underlying buffer.
222   /// Returns the total number of bytes written
223   int Flush();
224 
225   /// Resets all the state in the encoder.
226   void Clear();
227 
228   /// Returns pointer to underlying buffer
buffer()229   uint8_t* buffer() { return bit_writer_.buffer(); }
len()230   int32_t len() { return bit_writer_.bytes_written(); }
231 
232  private:
233   /// Flushes any buffered values.  If this is part of a repeated run, this is largely
234   /// a no-op.
235   /// If it is part of a literal run, this will call FlushLiteralRun, which writes
236   /// out the buffered literal values.
237   /// If 'done' is true, the current run would be written even if it would normally
238   /// have been buffered more.  This should only be called at the end, when the
239   /// encoder has received all values even if it would normally continue to be
240   /// buffered.
241   void FlushBufferedValues(bool done);
242 
243   /// Flushes literal values to the underlying buffer.  If update_indicator_byte,
244   /// then the current literal run is complete and the indicator byte is updated.
245   void FlushLiteralRun(bool update_indicator_byte);
246 
247   /// Flushes a repeated run to the underlying buffer.
248   void FlushRepeatedRun();
249 
250   /// Checks and sets buffer_full_. This must be called after flushing a run to
251   /// make sure there are enough bytes remaining to encode the next run.
252   void CheckBufferFull();
253 
254   /// The maximum number of values in a single literal run
255   /// (number of groups encodable by a 1-byte indicator * 8)
256   static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8;
257 
258   /// Number of bits needed to encode the value. Must be between 0 and 64.
259   const int bit_width_;
260 
261   /// Underlying buffer.
262   BitUtil::BitWriter bit_writer_;
263 
264   /// If true, the buffer is full and subsequent Put()'s will fail.
265   bool buffer_full_;
266 
267   /// The maximum byte size a single run can take.
268   int max_run_byte_size_;
269 
270   /// We need to buffer at most 8 values for literals.  This happens when the
271   /// bit_width is 1 (so 8 values fit in one byte).
272   /// TODO: generalize this to other bit widths
273   int64_t buffered_values_[8];
274 
275   /// Number of values in buffered_values_
276   int num_buffered_values_;
277 
278   /// The current (also last) value that was written and the count of how
279   /// many times in a row that value has been seen.  This is maintained even
280   /// if we are in a literal run.  If the repeat_count_ get high enough, we switch
281   /// to encoding repeated runs.
282   uint64_t current_value_;
283   int repeat_count_;
284 
285   /// Number of literals in the current run.  This does not include the literals
286   /// that might be in buffered_values_.  Only after we've got a group big enough
287   /// can we decide if they should part of the literal_count_ or repeat_count_
288   int literal_count_;
289 
290   /// Pointer to a byte in the underlying buffer that stores the indicator byte.
291   /// This is reserved as soon as we need a literal run but the value is written
292   /// when the literal run is complete.
293   uint8_t* literal_indicator_byte_;
294 };
295 
296 template <typename T>
Get(T * val)297 inline bool RleDecoder::Get(T* val) {
298   return GetBatch(val, 1) == 1;
299 }
300 
301 template <typename T>
GetBatch(T * values,int batch_size)302 inline int RleDecoder::GetBatch(T* values, int batch_size) {
303   DCHECK_GE(bit_width_, 0);
304   int values_read = 0;
305 
306   auto* out = values;
307 
308   while (values_read < batch_size) {
309     int remaining = batch_size - values_read;
310 
311     if (repeat_count_ > 0) {  // Repeated value case.
312       int repeat_batch = std::min(remaining, repeat_count_);
313       std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
314 
315       repeat_count_ -= repeat_batch;
316       values_read += repeat_batch;
317       out += repeat_batch;
318     } else if (literal_count_ > 0) {
319       int literal_batch = std::min(remaining, literal_count_);
320       int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch);
321       if (actual_read != literal_batch) {
322         return values_read;
323       }
324 
325       literal_count_ -= literal_batch;
326       values_read += literal_batch;
327       out += literal_batch;
328     } else {
329       if (!NextCounts<T>()) return values_read;
330     }
331   }
332 
333   return values_read;
334 }
335 
336 template <typename T, typename RunType, typename Converter>
GetSpaced(Converter converter,int batch_size,int null_count,const uint8_t * valid_bits,int64_t valid_bits_offset,T * out)337 inline int RleDecoder::GetSpaced(Converter converter, int batch_size, int null_count,
338                                  const uint8_t* valid_bits, int64_t valid_bits_offset,
339                                  T* out) {
340   if (ARROW_PREDICT_FALSE(null_count == batch_size)) {
341     converter.FillZero(out, out + batch_size);
342     return batch_size;
343   }
344 
345   DCHECK_GE(bit_width_, 0);
346   int values_read = 0;
347   int values_remaining = batch_size - null_count;
348 
349   // Assume no bits to start.
350   arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset,
351                                            /*length=*/batch_size);
352   arrow::internal::BitRun valid_run = bit_reader.NextRun();
353   while (values_read < batch_size) {
354     if (ARROW_PREDICT_FALSE(valid_run.length == 0)) {
355       valid_run = bit_reader.NextRun();
356     }
357 
358     DCHECK_GT(batch_size, 0);
359     DCHECK_GT(valid_run.length, 0);
360 
361     if (valid_run.set) {
362       if ((repeat_count_ == 0) && (literal_count_ == 0)) {
363         if (!NextCounts<RunType>()) return values_read;
364         DCHECK((repeat_count_ > 0) ^ (literal_count_ > 0));
365       }
366 
367       if (repeat_count_ > 0) {
368         int repeat_batch = 0;
369         // Consume the entire repeat counts incrementing repeat_batch to
370         // be the total of nulls + values consumed, we only need to
371         // get the total count because we can fill in the same value for
372         // nulls and non-nulls. This proves to be a big efficiency win.
373         while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
374           DCHECK_GT(valid_run.length, 0);
375           if (valid_run.set) {
376             int update_size = std::min(static_cast<int>(valid_run.length), repeat_count_);
377             repeat_count_ -= update_size;
378             repeat_batch += update_size;
379             valid_run.length -= update_size;
380             values_remaining -= update_size;
381           } else {
382             // We can consume all nulls here because we would do so on
383             //  the next loop anyways.
384             repeat_batch += static_cast<int>(valid_run.length);
385             valid_run.length = 0;
386           }
387           if (valid_run.length == 0) {
388             valid_run = bit_reader.NextRun();
389           }
390         }
391         RunType current_value = static_cast<RunType>(current_value_);
392         if (ARROW_PREDICT_FALSE(!converter.IsValid(current_value))) {
393           return values_read;
394         }
395         converter.Fill(out, out + repeat_batch, current_value);
396         out += repeat_batch;
397         values_read += repeat_batch;
398       } else if (literal_count_ > 0) {
399         int literal_batch = std::min(values_remaining, literal_count_);
400         DCHECK_GT(literal_batch, 0);
401 
402         // Decode the literals
403         constexpr int kBufferSize = 1024;
404         RunType indices[kBufferSize];
405         literal_batch = std::min(literal_batch, kBufferSize);
406         int actual_read = bit_reader_.GetBatch(bit_width_, indices, literal_batch);
407         if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) {
408           return values_read;
409         }
410         if (!converter.IsValid(indices, /*length=*/actual_read)) {
411           return values_read;
412         }
413         int skipped = 0;
414         int literals_read = 0;
415         while (literals_read < literal_batch) {
416           if (valid_run.set) {
417             int update_size = std::min(literal_batch - literals_read,
418                                        static_cast<int>(valid_run.length));
419             converter.Copy(out, indices + literals_read, update_size);
420             literals_read += update_size;
421             out += update_size;
422             valid_run.length -= update_size;
423           } else {
424             converter.FillZero(out, out + valid_run.length);
425             out += valid_run.length;
426             skipped += static_cast<int>(valid_run.length);
427             valid_run.length = 0;
428           }
429           if (valid_run.length == 0) {
430             valid_run = bit_reader.NextRun();
431           }
432         }
433         literal_count_ -= literal_batch;
434         values_remaining -= literal_batch;
435         values_read += literal_batch + skipped;
436       }
437     } else {
438       converter.FillZero(out, out + valid_run.length);
439       out += valid_run.length;
440       values_read += static_cast<int>(valid_run.length);
441       valid_run.length = 0;
442     }
443   }
444   DCHECK_EQ(valid_run.length, 0);
445   DCHECK_EQ(values_remaining, 0);
446   return values_read;
447 }
448 
449 // Converter for GetSpaced that handles runs that get returned
450 // directly as output.
451 template <typename T>
452 struct PlainRleConverter {
453   T kZero = {};
IsValidPlainRleConverter454   inline bool IsValid(const T& values) const { return true; }
IsValidPlainRleConverter455   inline bool IsValid(const T* values, int32_t length) const { return true; }
FillPlainRleConverter456   inline void Fill(T* begin, T* end, const T& run_value) const {
457     std::fill(begin, end, run_value);
458   }
FillZeroPlainRleConverter459   inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); }
CopyPlainRleConverter460   inline void Copy(T* out, const T* values, int length) const {
461     std::memcpy(out, values, length * sizeof(T));
462   }
463 };
464 
465 template <typename T>
GetBatchSpaced(int batch_size,int null_count,const uint8_t * valid_bits,int64_t valid_bits_offset,T * out)466 inline int RleDecoder::GetBatchSpaced(int batch_size, int null_count,
467                                       const uint8_t* valid_bits,
468                                       int64_t valid_bits_offset, T* out) {
469   if (null_count == 0) {
470     return GetBatch<T>(out, batch_size);
471   }
472 
473   PlainRleConverter<T> converter;
474   arrow::internal::BitBlockCounter block_counter(valid_bits, valid_bits_offset,
475                                                  batch_size);
476 
477   int total_processed = 0;
478   int processed = 0;
479   arrow::internal::BitBlockCount block;
480 
481   do {
482     block = block_counter.NextFourWords();
483     if (block.length == 0) {
484       break;
485     }
486     if (block.AllSet()) {
487       processed = GetBatch<T>(out, block.length);
488     } else if (block.NoneSet()) {
489       converter.FillZero(out, out + block.length);
490       processed = block.length;
491     } else {
492       processed = GetSpaced<T, /*RunType=*/T, PlainRleConverter<T>>(
493           converter, block.length, block.length - block.popcount, valid_bits,
494           valid_bits_offset, out);
495     }
496     total_processed += processed;
497     out += block.length;
498     valid_bits_offset += block.length;
499   } while (processed == block.length);
500   return total_processed;
501 }
502 
IndexInRange(int32_t idx,int32_t dictionary_length)503 static inline bool IndexInRange(int32_t idx, int32_t dictionary_length) {
504   return idx >= 0 && idx < dictionary_length;
505 }
506 
507 // Converter for GetSpaced that handles runs of returned dictionary
508 // indices.
509 template <typename T>
510 struct DictionaryConverter {
511   T kZero = {};
512   const T* dictionary;
513   int32_t dictionary_length;
514 
IsValidDictionaryConverter515   inline bool IsValid(int32_t value) { return IndexInRange(value, dictionary_length); }
516 
IsValidDictionaryConverter517   inline bool IsValid(const int32_t* values, int32_t length) const {
518     using IndexType = int32_t;
519     IndexType min_index = std::numeric_limits<IndexType>::max();
520     IndexType max_index = std::numeric_limits<IndexType>::min();
521     for (int x = 0; x < length; x++) {
522       min_index = std::min(values[x], min_index);
523       max_index = std::max(values[x], max_index);
524     }
525 
526     return IndexInRange(min_index, dictionary_length) &&
527            IndexInRange(max_index, dictionary_length);
528   }
FillDictionaryConverter529   inline void Fill(T* begin, T* end, const int32_t& run_value) const {
530     std::fill(begin, end, dictionary[run_value]);
531   }
FillZeroDictionaryConverter532   inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); }
533 
CopyDictionaryConverter534   inline void Copy(T* out, const int32_t* values, int length) const {
535     for (int x = 0; x < length; x++) {
536       out[x] = dictionary[values[x]];
537     }
538   }
539 };
540 
541 template <typename T>
GetBatchWithDict(const T * dictionary,int32_t dictionary_length,T * values,int batch_size)542 inline int RleDecoder::GetBatchWithDict(const T* dictionary, int32_t dictionary_length,
543                                         T* values, int batch_size) {
544   // Per https://github.com/apache/parquet-format/blob/master/Encodings.md,
545   // the maximum dictionary index width in Parquet is 32 bits.
546   using IndexType = int32_t;
547   DictionaryConverter<T> converter;
548   converter.dictionary = dictionary;
549   converter.dictionary_length = dictionary_length;
550 
551   DCHECK_GE(bit_width_, 0);
552   int values_read = 0;
553 
554   auto* out = values;
555 
556   while (values_read < batch_size) {
557     int remaining = batch_size - values_read;
558 
559     if (repeat_count_ > 0) {
560       auto idx = static_cast<IndexType>(current_value_);
561       if (ARROW_PREDICT_FALSE(!IndexInRange(idx, dictionary_length))) {
562         return values_read;
563       }
564       T val = dictionary[idx];
565 
566       int repeat_batch = std::min(remaining, repeat_count_);
567       std::fill(out, out + repeat_batch, val);
568 
569       /* Upkeep counters */
570       repeat_count_ -= repeat_batch;
571       values_read += repeat_batch;
572       out += repeat_batch;
573     } else if (literal_count_ > 0) {
574       constexpr int kBufferSize = 1024;
575       IndexType indices[kBufferSize];
576 
577       int literal_batch = std::min(remaining, literal_count_);
578       literal_batch = std::min(literal_batch, kBufferSize);
579 
580       int actual_read = bit_reader_.GetBatch(bit_width_, indices, literal_batch);
581       if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) {
582         return values_read;
583       }
584       if (ARROW_PREDICT_FALSE(!converter.IsValid(indices, /*length=*/literal_batch))) {
585         return values_read;
586       }
587       converter.Copy(out, indices, literal_batch);
588 
589       /* Upkeep counters */
590       literal_count_ -= literal_batch;
591       values_read += literal_batch;
592       out += literal_batch;
593     } else {
594       if (!NextCounts<IndexType>()) return values_read;
595     }
596   }
597 
598   return values_read;
599 }
600 
601 template <typename T>
GetBatchWithDictSpaced(const T * dictionary,int32_t dictionary_length,T * out,int batch_size,int null_count,const uint8_t * valid_bits,int64_t valid_bits_offset)602 inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary,
603                                               int32_t dictionary_length, T* out,
604                                               int batch_size, int null_count,
605                                               const uint8_t* valid_bits,
606                                               int64_t valid_bits_offset) {
607   if (null_count == 0) {
608     return GetBatchWithDict<T>(dictionary, dictionary_length, out, batch_size);
609   }
610   arrow::internal::BitBlockCounter block_counter(valid_bits, valid_bits_offset,
611                                                  batch_size);
612   using IndexType = int32_t;
613   DictionaryConverter<T> converter;
614   converter.dictionary = dictionary;
615   converter.dictionary_length = dictionary_length;
616 
617   int total_processed = 0;
618   int processed = 0;
619   arrow::internal::BitBlockCount block;
620   do {
621     block = block_counter.NextFourWords();
622     if (block.length == 0) {
623       break;
624     }
625     if (block.AllSet()) {
626       processed = GetBatchWithDict<T>(dictionary, dictionary_length, out, block.length);
627     } else if (block.NoneSet()) {
628       converter.FillZero(out, out + block.length);
629       processed = block.length;
630     } else {
631       processed = GetSpaced<T, /*RunType=*/IndexType, DictionaryConverter<T>>(
632           converter, block.length, block.length - block.popcount, valid_bits,
633           valid_bits_offset, out);
634     }
635     total_processed += processed;
636     out += block.length;
637     valid_bits_offset += block.length;
638   } while (processed == block.length);
639   return total_processed;
640 }
641 
642 template <typename T>
NextCounts()643 bool RleDecoder::NextCounts() {
644   // Read the next run's indicator int, it could be a literal or repeated run.
645   // The int is encoded as a vlq-encoded value.
646   uint32_t indicator_value = 0;
647   if (!bit_reader_.GetVlqInt(&indicator_value)) return false;
648 
649   // lsb indicates if it is a literal run or repeated run
650   bool is_literal = indicator_value & 1;
651   uint32_t count = indicator_value >> 1;
652   if (is_literal) {
653     if (ARROW_PREDICT_FALSE(count == 0 || count > static_cast<uint32_t>(INT32_MAX) / 8)) {
654       return false;
655     }
656     literal_count_ = count * 8;
657   } else {
658     if (ARROW_PREDICT_FALSE(count == 0 || count > static_cast<uint32_t>(INT32_MAX))) {
659       return false;
660     }
661     repeat_count_ = count;
662     T value = {};
663     if (!bit_reader_.GetAligned<T>(static_cast<int>(BitUtil::CeilDiv(bit_width_, 8)),
664                                    &value)) {
665       return false;
666     }
667     current_value_ = static_cast<uint64_t>(value);
668   }
669   return true;
670 }
671 
672 /// This function buffers input values 8 at a time.  After seeing all 8 values,
673 /// it decides whether they should be encoded as a literal or repeated run.
Put(uint64_t value)674 inline bool RleEncoder::Put(uint64_t value) {
675   DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_));
676   if (ARROW_PREDICT_FALSE(buffer_full_)) return false;
677 
678   if (ARROW_PREDICT_TRUE(current_value_ == value)) {
679     ++repeat_count_;
680     if (repeat_count_ > 8) {
681       // This is just a continuation of the current run, no need to buffer the
682       // values.
683       // Note that this is the fast path for long repeated runs.
684       return true;
685     }
686   } else {
687     if (repeat_count_ >= 8) {
688       // We had a run that was long enough but it has ended.  Flush the
689       // current repeated run.
690       DCHECK_EQ(literal_count_, 0);
691       FlushRepeatedRun();
692     }
693     repeat_count_ = 1;
694     current_value_ = value;
695   }
696 
697   buffered_values_[num_buffered_values_] = value;
698   if (++num_buffered_values_ == 8) {
699     DCHECK_EQ(literal_count_ % 8, 0);
700     FlushBufferedValues(false);
701   }
702   return true;
703 }
704 
FlushLiteralRun(bool update_indicator_byte)705 inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
706   if (literal_indicator_byte_ == NULL) {
707     // The literal indicator byte has not been reserved yet, get one now.
708     literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
709     DCHECK(literal_indicator_byte_ != NULL);
710   }
711 
712   // Write all the buffered values as bit packed literals
713   for (int i = 0; i < num_buffered_values_; ++i) {
714     bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_);
715     DCHECK(success) << "There is a bug in using CheckBufferFull()";
716   }
717   num_buffered_values_ = 0;
718 
719   if (update_indicator_byte) {
720     // At this point we need to write the indicator byte for the literal run.
721     // We only reserve one byte, to allow for streaming writes of literal values.
722     // The logic makes sure we flush literal runs often enough to not overrun
723     // the 1 byte.
724     DCHECK_EQ(literal_count_ % 8, 0);
725     int num_groups = literal_count_ / 8;
726     int32_t indicator_value = (num_groups << 1) | 1;
727     DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
728     *literal_indicator_byte_ = static_cast<uint8_t>(indicator_value);
729     literal_indicator_byte_ = NULL;
730     literal_count_ = 0;
731     CheckBufferFull();
732   }
733 }
734 
FlushRepeatedRun()735 inline void RleEncoder::FlushRepeatedRun() {
736   DCHECK_GT(repeat_count_, 0);
737   bool result = true;
738   // The lsb of 0 indicates this is a repeated run
739   int32_t indicator_value = repeat_count_ << 1 | 0;
740   result &= bit_writer_.PutVlqInt(static_cast<uint32_t>(indicator_value));
741   result &= bit_writer_.PutAligned(current_value_,
742                                    static_cast<int>(BitUtil::CeilDiv(bit_width_, 8)));
743   DCHECK(result);
744   num_buffered_values_ = 0;
745   repeat_count_ = 0;
746   CheckBufferFull();
747 }
748 
749 /// Flush the values that have been buffered.  At this point we decide whether
750 /// we need to switch between the run types or continue the current one.
FlushBufferedValues(bool done)751 inline void RleEncoder::FlushBufferedValues(bool done) {
752   if (repeat_count_ >= 8) {
753     // Clear the buffered values.  They are part of the repeated run now and we
754     // don't want to flush them out as literals.
755     num_buffered_values_ = 0;
756     if (literal_count_ != 0) {
757       // There was a current literal run.  All the values in it have been flushed
758       // but we still need to update the indicator byte.
759       DCHECK_EQ(literal_count_ % 8, 0);
760       DCHECK_EQ(repeat_count_, 8);
761       FlushLiteralRun(true);
762     }
763     DCHECK_EQ(literal_count_, 0);
764     return;
765   }
766 
767   literal_count_ += num_buffered_values_;
768   DCHECK_EQ(literal_count_ % 8, 0);
769   int num_groups = literal_count_ / 8;
770   if (num_groups + 1 >= (1 << 6)) {
771     // We need to start a new literal run because the indicator byte we've reserved
772     // cannot store more values.
773     DCHECK(literal_indicator_byte_ != NULL);
774     FlushLiteralRun(true);
775   } else {
776     FlushLiteralRun(done);
777   }
778   repeat_count_ = 0;
779 }
780 
Flush()781 inline int RleEncoder::Flush() {
782   if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
783     bool all_repeat = literal_count_ == 0 && (repeat_count_ == num_buffered_values_ ||
784                                               num_buffered_values_ == 0);
785     // There is something pending, figure out if it's a repeated or literal run
786     if (repeat_count_ > 0 && all_repeat) {
787       FlushRepeatedRun();
788     } else {
789       DCHECK_EQ(literal_count_ % 8, 0);
790       // Buffer the last group of literals to 8 by padding with 0s.
791       for (; num_buffered_values_ != 0 && num_buffered_values_ < 8;
792            ++num_buffered_values_) {
793         buffered_values_[num_buffered_values_] = 0;
794       }
795       literal_count_ += num_buffered_values_;
796       FlushLiteralRun(true);
797       repeat_count_ = 0;
798     }
799   }
800   bit_writer_.Flush();
801   DCHECK_EQ(num_buffered_values_, 0);
802   DCHECK_EQ(literal_count_, 0);
803   DCHECK_EQ(repeat_count_, 0);
804 
805   return bit_writer_.bytes_written();
806 }
807 
CheckBufferFull()808 inline void RleEncoder::CheckBufferFull() {
809   int bytes_written = bit_writer_.bytes_written();
810   if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
811     buffer_full_ = true;
812   }
813 }
814 
Clear()815 inline void RleEncoder::Clear() {
816   buffer_full_ = false;
817   current_value_ = 0;
818   repeat_count_ = 0;
819   num_buffered_values_ = 0;
820   literal_count_ = 0;
821   literal_indicator_byte_ = NULL;
822   bit_writer_.Clear();
823 }
824 
825 }  // namespace util
826 }  // namespace arrow
827