1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <algorithm> 21 #include <cstdint> 22 #include <cstring> 23 #include <memory> 24 #include <string> 25 #include <utility> 26 27 #include "arrow/buffer.h" 28 #include "arrow/status.h" 29 #include "arrow/util/bit_util.h" 30 #include "arrow/util/macros.h" 31 #include "arrow/util/ubsan.h" 32 #include "arrow/util/visibility.h" 33 34 namespace arrow { 35 36 // ---------------------------------------------------------------------- 37 // Buffer builder classes 38 39 /// \class BufferBuilder 40 /// \brief A class for incrementally building a contiguous chunk of in-memory 41 /// data 42 class ARROW_EXPORT BufferBuilder { 43 public: 44 explicit BufferBuilder(MemoryPool* pool = default_memory_pool()) pool_(pool)45 : pool_(pool), 46 data_(/*ensure never null to make ubsan happy and avoid check penalties below*/ 47 &util::internal::non_null_filler), 48 49 capacity_(0), 50 size_(0) {} 51 52 /// \brief Constructs new Builder that will start using 53 /// the provided buffer until Finish/Reset are called. 54 /// The buffer is not resized. 55 explicit BufferBuilder(std::shared_ptr<ResizableBuffer> buffer, 56 MemoryPool* pool = default_memory_pool()) buffer_(std::move (buffer))57 : buffer_(std::move(buffer)), 58 pool_(pool), 59 data_(buffer_->mutable_data()), 60 capacity_(buffer_->capacity()), 61 size_(buffer_->size()) {} 62 63 /// \brief Resize the buffer to the nearest multiple of 64 bytes 64 /// 65 /// \param new_capacity the new capacity of the of the builder. Will be 66 /// rounded up to a multiple of 64 bytes for padding \param shrink_to_fit if 67 /// new capacity is smaller than the existing size, reallocate internal 68 /// buffer. Set to false to avoid reallocations when shrinking the builder. 69 /// \return Status 70 Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) { 71 // Resize(0) is a no-op 72 if (new_capacity == 0) { 73 return Status::OK(); 74 } 75 if (buffer_ == NULLPTR) { 76 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(new_capacity, pool_)); 77 } else { 78 ARROW_RETURN_NOT_OK(buffer_->Resize(new_capacity, shrink_to_fit)); 79 } 80 capacity_ = buffer_->capacity(); 81 data_ = buffer_->mutable_data(); 82 return Status::OK(); 83 } 84 85 /// \brief Ensure that builder can accommodate the additional number of bytes 86 /// without the need to perform allocations 87 /// 88 /// \param[in] additional_bytes number of additional bytes to make space for 89 /// \return Status Reserve(const int64_t additional_bytes)90 Status Reserve(const int64_t additional_bytes) { 91 auto min_capacity = size_ + additional_bytes; 92 if (min_capacity <= capacity_) { 93 return Status::OK(); 94 } 95 return Resize(GrowByFactor(capacity_, min_capacity), false); 96 } 97 98 /// \brief Return a capacity expanded by the desired growth factor GrowByFactor(int64_t current_capacity,int64_t new_capacity)99 static int64_t GrowByFactor(int64_t current_capacity, int64_t new_capacity) { 100 // Doubling capacity except for large Reserve requests. 2x growth strategy 101 // (versus 1.5x) seems to have slightly better performance when using 102 // jemalloc, but significantly better performance when using the system 103 // allocator. See ARROW-6450 for further discussion 104 return std::max(new_capacity, current_capacity * 2); 105 } 106 107 /// \brief Append the given data to the buffer 108 /// 109 /// The buffer is automatically expanded if necessary. Append(const void * data,const int64_t length)110 Status Append(const void* data, const int64_t length) { 111 if (ARROW_PREDICT_FALSE(size_ + length > capacity_)) { 112 ARROW_RETURN_NOT_OK(Resize(GrowByFactor(capacity_, size_ + length), false)); 113 } 114 UnsafeAppend(data, length); 115 return Status::OK(); 116 } 117 118 /// \brief Append copies of a value to the buffer 119 /// 120 /// The buffer is automatically expanded if necessary. Append(const int64_t num_copies,uint8_t value)121 Status Append(const int64_t num_copies, uint8_t value) { 122 ARROW_RETURN_NOT_OK(Reserve(num_copies)); 123 UnsafeAppend(num_copies, value); 124 return Status::OK(); 125 } 126 127 // Advance pointer and zero out memory Advance(const int64_t length)128 Status Advance(const int64_t length) { return Append(length, 0); } 129 130 // Advance pointer, but don't allocate or zero memory UnsafeAdvance(const int64_t length)131 void UnsafeAdvance(const int64_t length) { size_ += length; } 132 133 // Unsafe methods don't check existing size UnsafeAppend(const void * data,const int64_t length)134 void UnsafeAppend(const void* data, const int64_t length) { 135 memcpy(data_ + size_, data, static_cast<size_t>(length)); 136 size_ += length; 137 } 138 UnsafeAppend(const int64_t num_copies,uint8_t value)139 void UnsafeAppend(const int64_t num_copies, uint8_t value) { 140 memset(data_ + size_, value, static_cast<size_t>(num_copies)); 141 size_ += num_copies; 142 } 143 144 /// \brief Return result of builder as a Buffer object. 145 /// 146 /// The builder is reset and can be reused afterwards. 147 /// 148 /// \param[out] out the finalized Buffer object 149 /// \param shrink_to_fit if the buffer size is smaller than its capacity, 150 /// reallocate to fit more tightly in memory. Set to false to avoid 151 /// a reallocation, at the expense of potentially more memory consumption. 152 /// \return Status 153 Status Finish(std::shared_ptr<Buffer>* out, bool shrink_to_fit = true) { 154 ARROW_RETURN_NOT_OK(Resize(size_, shrink_to_fit)); 155 if (size_ != 0) buffer_->ZeroPadding(); 156 *out = buffer_; 157 if (*out == NULLPTR) { 158 ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(0, pool_)); 159 } 160 Reset(); 161 return Status::OK(); 162 } 163 Reset()164 void Reset() { 165 buffer_ = NULLPTR; 166 capacity_ = size_ = 0; 167 } 168 169 /// \brief Set size to a smaller value without modifying builder 170 /// contents. For reusable BufferBuilder classes 171 /// \param[in] position must be non-negative and less than or equal 172 /// to the current length() Rewind(int64_t position)173 void Rewind(int64_t position) { size_ = position; } 174 capacity()175 int64_t capacity() const { return capacity_; } length()176 int64_t length() const { return size_; } data()177 const uint8_t* data() const { return data_; } mutable_data()178 uint8_t* mutable_data() { return data_; } 179 180 private: 181 std::shared_ptr<ResizableBuffer> buffer_; 182 MemoryPool* pool_; 183 uint8_t* data_; 184 int64_t capacity_; 185 int64_t size_; 186 }; 187 188 template <typename T, typename Enable = void> 189 class TypedBufferBuilder; 190 191 /// \brief A BufferBuilder for building a buffer of arithmetic elements 192 template <typename T> 193 class TypedBufferBuilder< 194 T, typename std::enable_if<std::is_arithmetic<T>::value || 195 std::is_standard_layout<T>::value>::type> { 196 public: 197 explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool()) bytes_builder_(pool)198 : bytes_builder_(pool) {} 199 200 explicit TypedBufferBuilder(std::shared_ptr<ResizableBuffer> buffer, 201 MemoryPool* pool = default_memory_pool()) bytes_builder_(std::move (buffer),pool)202 : bytes_builder_(std::move(buffer), pool) {} 203 Append(T value)204 Status Append(T value) { 205 return bytes_builder_.Append(reinterpret_cast<uint8_t*>(&value), sizeof(T)); 206 } 207 Append(const T * values,int64_t num_elements)208 Status Append(const T* values, int64_t num_elements) { 209 return bytes_builder_.Append(reinterpret_cast<const uint8_t*>(values), 210 num_elements * sizeof(T)); 211 } 212 Append(const int64_t num_copies,T value)213 Status Append(const int64_t num_copies, T value) { 214 ARROW_RETURN_NOT_OK(Reserve(num_copies + length())); 215 UnsafeAppend(num_copies, value); 216 return Status::OK(); 217 } 218 UnsafeAppend(T value)219 void UnsafeAppend(T value) { 220 bytes_builder_.UnsafeAppend(reinterpret_cast<uint8_t*>(&value), sizeof(T)); 221 } 222 UnsafeAppend(const T * values,int64_t num_elements)223 void UnsafeAppend(const T* values, int64_t num_elements) { 224 bytes_builder_.UnsafeAppend(reinterpret_cast<const uint8_t*>(values), 225 num_elements * sizeof(T)); 226 } 227 228 template <typename Iter> UnsafeAppend(Iter values_begin,Iter values_end)229 void UnsafeAppend(Iter values_begin, Iter values_end) { 230 int64_t num_elements = static_cast<int64_t>(std::distance(values_begin, values_end)); 231 auto data = mutable_data() + length(); 232 bytes_builder_.UnsafeAdvance(num_elements * sizeof(T)); 233 std::copy(values_begin, values_end, data); 234 } 235 UnsafeAppend(const int64_t num_copies,T value)236 void UnsafeAppend(const int64_t num_copies, T value) { 237 auto data = mutable_data() + length(); 238 bytes_builder_.UnsafeAdvance(num_copies * sizeof(T)); 239 std::fill(data, data + num_copies, value); 240 } 241 242 Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) { 243 return bytes_builder_.Resize(new_capacity * sizeof(T), shrink_to_fit); 244 } 245 Reserve(const int64_t additional_elements)246 Status Reserve(const int64_t additional_elements) { 247 return bytes_builder_.Reserve(additional_elements * sizeof(T)); 248 } 249 Advance(const int64_t length)250 Status Advance(const int64_t length) { 251 return bytes_builder_.Advance(length * sizeof(T)); 252 } 253 254 Status Finish(std::shared_ptr<Buffer>* out, bool shrink_to_fit = true) { 255 return bytes_builder_.Finish(out, shrink_to_fit); 256 } 257 Reset()258 void Reset() { bytes_builder_.Reset(); } 259 length()260 int64_t length() const { return bytes_builder_.length() / sizeof(T); } capacity()261 int64_t capacity() const { return bytes_builder_.capacity() / sizeof(T); } data()262 const T* data() const { return reinterpret_cast<const T*>(bytes_builder_.data()); } mutable_data()263 T* mutable_data() { return reinterpret_cast<T*>(bytes_builder_.mutable_data()); } 264 265 private: 266 BufferBuilder bytes_builder_; 267 }; 268 269 /// \brief A BufferBuilder for building a buffer containing a bitmap 270 template <> 271 class TypedBufferBuilder<bool> { 272 public: 273 explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool()) bytes_builder_(pool)274 : bytes_builder_(pool) {} 275 Append(bool value)276 Status Append(bool value) { 277 ARROW_RETURN_NOT_OK(Reserve(1)); 278 UnsafeAppend(value); 279 return Status::OK(); 280 } 281 Append(const uint8_t * valid_bytes,int64_t num_elements)282 Status Append(const uint8_t* valid_bytes, int64_t num_elements) { 283 ARROW_RETURN_NOT_OK(Reserve(num_elements)); 284 UnsafeAppend(valid_bytes, num_elements); 285 return Status::OK(); 286 } 287 Append(const int64_t num_copies,bool value)288 Status Append(const int64_t num_copies, bool value) { 289 ARROW_RETURN_NOT_OK(Reserve(num_copies)); 290 UnsafeAppend(num_copies, value); 291 return Status::OK(); 292 } 293 UnsafeAppend(bool value)294 void UnsafeAppend(bool value) { 295 BitUtil::SetBitTo(mutable_data(), bit_length_, value); 296 if (!value) { 297 ++false_count_; 298 } 299 ++bit_length_; 300 } 301 UnsafeAppend(const uint8_t * bytes,int64_t num_elements)302 void UnsafeAppend(const uint8_t* bytes, int64_t num_elements) { 303 if (num_elements == 0) return; 304 int64_t i = 0; 305 internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements, [&] { 306 bool value = bytes[i++]; 307 false_count_ += !value; 308 return value; 309 }); 310 bit_length_ += num_elements; 311 } 312 UnsafeAppend(const int64_t num_copies,bool value)313 void UnsafeAppend(const int64_t num_copies, bool value) { 314 BitUtil::SetBitsTo(mutable_data(), bit_length_, num_copies, value); 315 false_count_ += num_copies * !value; 316 bit_length_ += num_copies; 317 } 318 319 template <bool count_falses, typename Generator> UnsafeAppend(const int64_t num_elements,Generator && gen)320 void UnsafeAppend(const int64_t num_elements, Generator&& gen) { 321 if (num_elements == 0) return; 322 323 if (count_falses) { 324 internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements, [&] { 325 bool value = gen(); 326 false_count_ += !value; 327 return value; 328 }); 329 } else { 330 internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements, 331 std::forward<Generator>(gen)); 332 } 333 bit_length_ += num_elements; 334 } 335 336 Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) { 337 const int64_t old_byte_capacity = bytes_builder_.capacity(); 338 ARROW_RETURN_NOT_OK( 339 bytes_builder_.Resize(BitUtil::BytesForBits(new_capacity), shrink_to_fit)); 340 // Resize() may have chosen a larger capacity (e.g. for padding), 341 // so ask it again before calling memset(). 342 const int64_t new_byte_capacity = bytes_builder_.capacity(); 343 if (new_byte_capacity > old_byte_capacity) { 344 // The additional buffer space is 0-initialized for convenience, 345 // so that other methods can simply bump the length. 346 memset(mutable_data() + old_byte_capacity, 0, 347 static_cast<size_t>(new_byte_capacity - old_byte_capacity)); 348 } 349 return Status::OK(); 350 } 351 Reserve(const int64_t additional_elements)352 Status Reserve(const int64_t additional_elements) { 353 return Resize( 354 BufferBuilder::GrowByFactor(bit_length_, bit_length_ + additional_elements), 355 false); 356 } 357 Advance(const int64_t length)358 Status Advance(const int64_t length) { 359 ARROW_RETURN_NOT_OK(Reserve(length)); 360 bit_length_ += length; 361 false_count_ += length; 362 return Status::OK(); 363 } 364 365 Status Finish(std::shared_ptr<Buffer>* out, bool shrink_to_fit = true) { 366 // set bytes_builder_.size_ == byte size of data 367 bytes_builder_.UnsafeAdvance(BitUtil::BytesForBits(bit_length_) - 368 bytes_builder_.length()); 369 bit_length_ = false_count_ = 0; 370 return bytes_builder_.Finish(out, shrink_to_fit); 371 } 372 Reset()373 void Reset() { 374 bytes_builder_.Reset(); 375 bit_length_ = false_count_ = 0; 376 } 377 length()378 int64_t length() const { return bit_length_; } capacity()379 int64_t capacity() const { return bytes_builder_.capacity() * 8; } data()380 const uint8_t* data() const { return bytes_builder_.data(); } mutable_data()381 uint8_t* mutable_data() { return bytes_builder_.mutable_data(); } false_count()382 int64_t false_count() const { return false_count_; } 383 384 private: 385 BufferBuilder bytes_builder_; 386 int64_t bit_length_ = 0; 387 int64_t false_count_ = 0; 388 }; 389 390 } // namespace arrow 391