1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 
27 #include "arrow/buffer.h"
28 #include "arrow/status.h"
29 #include "arrow/util/bit_util.h"
30 #include "arrow/util/macros.h"
31 #include "arrow/util/ubsan.h"
32 #include "arrow/util/visibility.h"
33 
34 namespace arrow {
35 
36 // ----------------------------------------------------------------------
37 // Buffer builder classes
38 
39 /// \class BufferBuilder
40 /// \brief A class for incrementally building a contiguous chunk of in-memory
41 /// data
42 class ARROW_EXPORT BufferBuilder {
43  public:
44   explicit BufferBuilder(MemoryPool* pool = default_memory_pool())
pool_(pool)45       : pool_(pool),
46         data_(/*ensure never null to make ubsan happy and avoid check penalties below*/
47               &util::internal::non_null_filler),
48 
49         capacity_(0),
50         size_(0) {}
51 
52   /// \brief Constructs new Builder that will start using
53   /// the provided buffer until Finish/Reset are called.
54   /// The buffer is not resized.
55   explicit BufferBuilder(std::shared_ptr<ResizableBuffer> buffer,
56                          MemoryPool* pool = default_memory_pool())
buffer_(std::move (buffer))57       : buffer_(std::move(buffer)),
58         pool_(pool),
59         data_(buffer_->mutable_data()),
60         capacity_(buffer_->capacity()),
61         size_(buffer_->size()) {}
62 
63   /// \brief Resize the buffer to the nearest multiple of 64 bytes
64   ///
65   /// \param new_capacity the new capacity of the of the builder. Will be
66   /// rounded up to a multiple of 64 bytes for padding \param shrink_to_fit if
67   /// new capacity is smaller than the existing size, reallocate internal
68   /// buffer. Set to false to avoid reallocations when shrinking the builder.
69   /// \return Status
70   Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) {
71     // Resize(0) is a no-op
72     if (new_capacity == 0) {
73       return Status::OK();
74     }
75     if (buffer_ == NULLPTR) {
76       ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(new_capacity, pool_));
77     } else {
78       ARROW_RETURN_NOT_OK(buffer_->Resize(new_capacity, shrink_to_fit));
79     }
80     capacity_ = buffer_->capacity();
81     data_ = buffer_->mutable_data();
82     return Status::OK();
83   }
84 
85   /// \brief Ensure that builder can accommodate the additional number of bytes
86   /// without the need to perform allocations
87   ///
88   /// \param[in] additional_bytes number of additional bytes to make space for
89   /// \return Status
Reserve(const int64_t additional_bytes)90   Status Reserve(const int64_t additional_bytes) {
91     auto min_capacity = size_ + additional_bytes;
92     if (min_capacity <= capacity_) {
93       return Status::OK();
94     }
95     return Resize(GrowByFactor(capacity_, min_capacity), false);
96   }
97 
98   /// \brief Return a capacity expanded by the desired growth factor
GrowByFactor(int64_t current_capacity,int64_t new_capacity)99   static int64_t GrowByFactor(int64_t current_capacity, int64_t new_capacity) {
100     // Doubling capacity except for large Reserve requests. 2x growth strategy
101     // (versus 1.5x) seems to have slightly better performance when using
102     // jemalloc, but significantly better performance when using the system
103     // allocator. See ARROW-6450 for further discussion
104     return std::max(new_capacity, current_capacity * 2);
105   }
106 
107   /// \brief Append the given data to the buffer
108   ///
109   /// The buffer is automatically expanded if necessary.
Append(const void * data,const int64_t length)110   Status Append(const void* data, const int64_t length) {
111     if (ARROW_PREDICT_FALSE(size_ + length > capacity_)) {
112       ARROW_RETURN_NOT_OK(Resize(GrowByFactor(capacity_, size_ + length), false));
113     }
114     UnsafeAppend(data, length);
115     return Status::OK();
116   }
117 
118   /// \brief Append copies of a value to the buffer
119   ///
120   /// The buffer is automatically expanded if necessary.
Append(const int64_t num_copies,uint8_t value)121   Status Append(const int64_t num_copies, uint8_t value) {
122     ARROW_RETURN_NOT_OK(Reserve(num_copies));
123     UnsafeAppend(num_copies, value);
124     return Status::OK();
125   }
126 
127   // Advance pointer and zero out memory
Advance(const int64_t length)128   Status Advance(const int64_t length) { return Append(length, 0); }
129 
130   // Advance pointer, but don't allocate or zero memory
UnsafeAdvance(const int64_t length)131   void UnsafeAdvance(const int64_t length) { size_ += length; }
132 
133   // Unsafe methods don't check existing size
UnsafeAppend(const void * data,const int64_t length)134   void UnsafeAppend(const void* data, const int64_t length) {
135     memcpy(data_ + size_, data, static_cast<size_t>(length));
136     size_ += length;
137   }
138 
UnsafeAppend(const int64_t num_copies,uint8_t value)139   void UnsafeAppend(const int64_t num_copies, uint8_t value) {
140     memset(data_ + size_, value, static_cast<size_t>(num_copies));
141     size_ += num_copies;
142   }
143 
144   /// \brief Return result of builder as a Buffer object.
145   ///
146   /// The builder is reset and can be reused afterwards.
147   ///
148   /// \param[out] out the finalized Buffer object
149   /// \param shrink_to_fit if the buffer size is smaller than its capacity,
150   /// reallocate to fit more tightly in memory. Set to false to avoid
151   /// a reallocation, at the expense of potentially more memory consumption.
152   /// \return Status
153   Status Finish(std::shared_ptr<Buffer>* out, bool shrink_to_fit = true) {
154     ARROW_RETURN_NOT_OK(Resize(size_, shrink_to_fit));
155     if (size_ != 0) buffer_->ZeroPadding();
156     *out = buffer_;
157     if (*out == NULLPTR) {
158       ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(0, pool_));
159     }
160     Reset();
161     return Status::OK();
162   }
163 
Reset()164   void Reset() {
165     buffer_ = NULLPTR;
166     capacity_ = size_ = 0;
167   }
168 
169   /// \brief Set size to a smaller value without modifying builder
170   /// contents. For reusable BufferBuilder classes
171   /// \param[in] position must be non-negative and less than or equal
172   /// to the current length()
Rewind(int64_t position)173   void Rewind(int64_t position) { size_ = position; }
174 
capacity()175   int64_t capacity() const { return capacity_; }
length()176   int64_t length() const { return size_; }
data()177   const uint8_t* data() const { return data_; }
mutable_data()178   uint8_t* mutable_data() { return data_; }
179 
180  private:
181   std::shared_ptr<ResizableBuffer> buffer_;
182   MemoryPool* pool_;
183   uint8_t* data_;
184   int64_t capacity_;
185   int64_t size_;
186 };
187 
188 template <typename T, typename Enable = void>
189 class TypedBufferBuilder;
190 
191 /// \brief A BufferBuilder for building a buffer of arithmetic elements
192 template <typename T>
193 class TypedBufferBuilder<
194     T, typename std::enable_if<std::is_arithmetic<T>::value ||
195                                std::is_standard_layout<T>::value>::type> {
196  public:
197   explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool())
bytes_builder_(pool)198       : bytes_builder_(pool) {}
199 
200   explicit TypedBufferBuilder(std::shared_ptr<ResizableBuffer> buffer,
201                               MemoryPool* pool = default_memory_pool())
bytes_builder_(std::move (buffer),pool)202       : bytes_builder_(std::move(buffer), pool) {}
203 
Append(T value)204   Status Append(T value) {
205     return bytes_builder_.Append(reinterpret_cast<uint8_t*>(&value), sizeof(T));
206   }
207 
Append(const T * values,int64_t num_elements)208   Status Append(const T* values, int64_t num_elements) {
209     return bytes_builder_.Append(reinterpret_cast<const uint8_t*>(values),
210                                  num_elements * sizeof(T));
211   }
212 
Append(const int64_t num_copies,T value)213   Status Append(const int64_t num_copies, T value) {
214     ARROW_RETURN_NOT_OK(Reserve(num_copies + length()));
215     UnsafeAppend(num_copies, value);
216     return Status::OK();
217   }
218 
UnsafeAppend(T value)219   void UnsafeAppend(T value) {
220     bytes_builder_.UnsafeAppend(reinterpret_cast<uint8_t*>(&value), sizeof(T));
221   }
222 
UnsafeAppend(const T * values,int64_t num_elements)223   void UnsafeAppend(const T* values, int64_t num_elements) {
224     bytes_builder_.UnsafeAppend(reinterpret_cast<const uint8_t*>(values),
225                                 num_elements * sizeof(T));
226   }
227 
228   template <typename Iter>
UnsafeAppend(Iter values_begin,Iter values_end)229   void UnsafeAppend(Iter values_begin, Iter values_end) {
230     int64_t num_elements = static_cast<int64_t>(std::distance(values_begin, values_end));
231     auto data = mutable_data() + length();
232     bytes_builder_.UnsafeAdvance(num_elements * sizeof(T));
233     std::copy(values_begin, values_end, data);
234   }
235 
UnsafeAppend(const int64_t num_copies,T value)236   void UnsafeAppend(const int64_t num_copies, T value) {
237     auto data = mutable_data() + length();
238     bytes_builder_.UnsafeAdvance(num_copies * sizeof(T));
239     std::fill(data, data + num_copies, value);
240   }
241 
242   Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) {
243     return bytes_builder_.Resize(new_capacity * sizeof(T), shrink_to_fit);
244   }
245 
Reserve(const int64_t additional_elements)246   Status Reserve(const int64_t additional_elements) {
247     return bytes_builder_.Reserve(additional_elements * sizeof(T));
248   }
249 
Advance(const int64_t length)250   Status Advance(const int64_t length) {
251     return bytes_builder_.Advance(length * sizeof(T));
252   }
253 
254   Status Finish(std::shared_ptr<Buffer>* out, bool shrink_to_fit = true) {
255     return bytes_builder_.Finish(out, shrink_to_fit);
256   }
257 
Reset()258   void Reset() { bytes_builder_.Reset(); }
259 
length()260   int64_t length() const { return bytes_builder_.length() / sizeof(T); }
capacity()261   int64_t capacity() const { return bytes_builder_.capacity() / sizeof(T); }
data()262   const T* data() const { return reinterpret_cast<const T*>(bytes_builder_.data()); }
mutable_data()263   T* mutable_data() { return reinterpret_cast<T*>(bytes_builder_.mutable_data()); }
264 
265  private:
266   BufferBuilder bytes_builder_;
267 };
268 
269 /// \brief A BufferBuilder for building a buffer containing a bitmap
270 template <>
271 class TypedBufferBuilder<bool> {
272  public:
273   explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool())
bytes_builder_(pool)274       : bytes_builder_(pool) {}
275 
Append(bool value)276   Status Append(bool value) {
277     ARROW_RETURN_NOT_OK(Reserve(1));
278     UnsafeAppend(value);
279     return Status::OK();
280   }
281 
Append(const uint8_t * valid_bytes,int64_t num_elements)282   Status Append(const uint8_t* valid_bytes, int64_t num_elements) {
283     ARROW_RETURN_NOT_OK(Reserve(num_elements));
284     UnsafeAppend(valid_bytes, num_elements);
285     return Status::OK();
286   }
287 
Append(const int64_t num_copies,bool value)288   Status Append(const int64_t num_copies, bool value) {
289     ARROW_RETURN_NOT_OK(Reserve(num_copies));
290     UnsafeAppend(num_copies, value);
291     return Status::OK();
292   }
293 
UnsafeAppend(bool value)294   void UnsafeAppend(bool value) {
295     BitUtil::SetBitTo(mutable_data(), bit_length_, value);
296     if (!value) {
297       ++false_count_;
298     }
299     ++bit_length_;
300   }
301 
UnsafeAppend(const uint8_t * bytes,int64_t num_elements)302   void UnsafeAppend(const uint8_t* bytes, int64_t num_elements) {
303     if (num_elements == 0) return;
304     int64_t i = 0;
305     internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements, [&] {
306       bool value = bytes[i++];
307       false_count_ += !value;
308       return value;
309     });
310     bit_length_ += num_elements;
311   }
312 
UnsafeAppend(const int64_t num_copies,bool value)313   void UnsafeAppend(const int64_t num_copies, bool value) {
314     BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value);
315     false_count_ += num_copies * !value;
316     bit_length_ += num_copies;
317   }
318 
319   template <bool count_falses, typename Generator>
UnsafeAppend(const int64_t num_elements,Generator && gen)320   void UnsafeAppend(const int64_t num_elements, Generator&& gen) {
321     if (num_elements == 0) return;
322 
323     if (count_falses) {
324       internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements, [&] {
325         bool value = gen();
326         false_count_ += !value;
327         return value;
328       });
329     } else {
330       internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements,
331                                      std::forward<Generator>(gen));
332     }
333     bit_length_ += num_elements;
334   }
335 
336   Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) {
337     const int64_t old_byte_capacity = bytes_builder_.capacity();
338     ARROW_RETURN_NOT_OK(
339         bytes_builder_.Resize(BitUtil::BytesForBits(new_capacity), shrink_to_fit));
340     // Resize() may have chosen a larger capacity (e.g. for padding),
341     // so ask it again before calling memset().
342     const int64_t new_byte_capacity = bytes_builder_.capacity();
343     if (new_byte_capacity > old_byte_capacity) {
344       // The additional buffer space is 0-initialized for convenience,
345       // so that other methods can simply bump the length.
346       memset(mutable_data() + old_byte_capacity, 0,
347              static_cast<size_t>(new_byte_capacity - old_byte_capacity));
348     }
349     return Status::OK();
350   }
351 
Reserve(const int64_t additional_elements)352   Status Reserve(const int64_t additional_elements) {
353     return Resize(
354         BufferBuilder::GrowByFactor(bit_length_, bit_length_ + additional_elements),
355         false);
356   }
357 
Advance(const int64_t length)358   Status Advance(const int64_t length) {
359     ARROW_RETURN_NOT_OK(Reserve(length));
360     bit_length_ += length;
361     false_count_ += length;
362     return Status::OK();
363   }
364 
365   Status Finish(std::shared_ptr<Buffer>* out, bool shrink_to_fit = true) {
366     // set bytes_builder_.size_ == byte size of data
367     bytes_builder_.UnsafeAdvance(BitUtil::BytesForBits(bit_length_) -
368                                  bytes_builder_.length());
369     bit_length_ = false_count_ = 0;
370     return bytes_builder_.Finish(out, shrink_to_fit);
371   }
372 
Reset()373   void Reset() {
374     bytes_builder_.Reset();
375     bit_length_ = false_count_ = 0;
376   }
377 
length()378   int64_t length() const { return bit_length_; }
capacity()379   int64_t capacity() const { return bytes_builder_.capacity() * 8; }
data()380   const uint8_t* data() const { return bytes_builder_.data(); }
mutable_data()381   uint8_t* mutable_data() { return bytes_builder_.mutable_data(); }
false_count()382   int64_t false_count() const { return false_count_; }
383 
384  private:
385   BufferBuilder bytes_builder_;
386   int64_t bit_length_ = 0;
387   int64_t false_count_ = 0;
388 };
389 
390 }  // namespace arrow
391