1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <algorithm>
19 #include <cstdint>
20 #include <cstring>
21 #include <limits>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include <gtest/gtest.h>
28 
29 #include "arrow/buffer.h"
30 #include "arrow/buffer_builder.h"
31 #include "arrow/device.h"
32 #include "arrow/io/interfaces.h"
33 #include "arrow/memory_pool.h"
34 #include "arrow/status.h"
35 #include "arrow/testing/gtest_util.h"
36 #include "arrow/util/checked_cast.h"
37 
38 namespace arrow {
39 
40 using internal::checked_cast;
41 using internal::checked_pointer_cast;
42 
43 static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
44 
45 static const int kMyDeviceAllowCopy = 1;
46 static const int kMyDeviceAllowView = 2;
47 static const int kMyDeviceDisallowCopyView = 3;
48 
49 class MyDevice : public Device {
50  public:
MyDevice(int value)51   explicit MyDevice(int value) : Device(), value_(value) {}
52 
type_name() const53   const char* type_name() const override { return kMyDeviceTypeName; }
54 
ToString() const55   std::string ToString() const override {
56     switch (value_) {
57       case kMyDeviceAllowCopy:
58         return "MyDevice[noview]";
59       case kMyDeviceAllowView:
60         return "MyDevice[nocopy]";
61       default:
62         return "MyDevice[nocopy][noview]";
63     }
64   }
65 
Equals(const Device & other) const66   bool Equals(const Device& other) const override {
67     if (other.type_name() != kMyDeviceTypeName) {
68       return false;
69     }
70     return checked_cast<const MyDevice&>(other).value_ == value_;
71   }
72 
73   std::shared_ptr<MemoryManager> default_memory_manager() override;
74 
value() const75   int value() const { return value_; }
76 
allow_copy() const77   bool allow_copy() const { return value_ == kMyDeviceAllowCopy; }
78 
allow_view() const79   bool allow_view() const { return value_ == kMyDeviceAllowView; }
80 
81  protected:
82   int value_;
83 };
84 
85 class MyMemoryManager : public MemoryManager {
86  public:
MyMemoryManager(std::shared_ptr<Device> device)87   explicit MyMemoryManager(std::shared_ptr<Device> device) : MemoryManager(device) {}
88 
allow_copy() const89   bool allow_copy() const {
90     return checked_cast<const MyDevice&>(*device()).allow_copy();
91   }
92 
allow_view() const93   bool allow_view() const {
94     return checked_cast<const MyDevice&>(*device()).allow_view();
95   }
96 
GetBufferReader(std::shared_ptr<Buffer> buf)97   Result<std::shared_ptr<io::RandomAccessFile>> GetBufferReader(
98       std::shared_ptr<Buffer> buf) override {
99     return Status::NotImplemented("");
100   }
101 
AllocateBuffer(int64_t size)102   Result<std::shared_ptr<Buffer>> AllocateBuffer(int64_t size) override {
103     return Status::NotImplemented("");
104   }
105 
GetBufferWriter(std::shared_ptr<Buffer> buf)106   Result<std::shared_ptr<io::OutputStream>> GetBufferWriter(
107       std::shared_ptr<Buffer> buf) override {
108     return Status::NotImplemented("");
109   }
110 
111  protected:
112   Result<std::shared_ptr<Buffer>> CopyBufferFrom(
113       const std::shared_ptr<Buffer>& buf,
114       const std::shared_ptr<MemoryManager>& from) override;
115   Result<std::shared_ptr<Buffer>> CopyBufferTo(
116       const std::shared_ptr<Buffer>& buf,
117       const std::shared_ptr<MemoryManager>& to) override;
118   Result<std::shared_ptr<Buffer>> ViewBufferFrom(
119       const std::shared_ptr<Buffer>& buf,
120       const std::shared_ptr<MemoryManager>& from) override;
121   Result<std::shared_ptr<Buffer>> ViewBufferTo(
122       const std::shared_ptr<Buffer>& buf,
123       const std::shared_ptr<MemoryManager>& to) override;
124 };
125 
126 class MyBuffer : public Buffer {
127  public:
MyBuffer(std::shared_ptr<MemoryManager> mm,const std::shared_ptr<Buffer> & parent)128   MyBuffer(std::shared_ptr<MemoryManager> mm, const std::shared_ptr<Buffer>& parent)
129       : Buffer(parent->data(), parent->size()) {
130     parent_ = parent;
131     SetMemoryManager(mm);
132   }
133 };
134 
default_memory_manager()135 std::shared_ptr<MemoryManager> MyDevice::default_memory_manager() {
136   return std::make_shared<MyMemoryManager>(shared_from_this());
137 }
138 
CopyBufferFrom(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & from)139 Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferFrom(
140     const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
141   if (!allow_copy()) {
142     return nullptr;
143   }
144   if (from->is_cpu()) {
145     // CPU to MyDevice:
146     // 1. CPU to CPU
147     ARROW_ASSIGN_OR_RAISE(auto dest,
148                           MemoryManager::CopyBuffer(buf, default_cpu_memory_manager()));
149     // 2. Wrap CPU buffer result
150     return std::make_shared<MyBuffer>(shared_from_this(), dest);
151   }
152   return nullptr;
153 }
154 
CopyBufferTo(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & to)155 Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferTo(
156     const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
157   if (!allow_copy()) {
158     return nullptr;
159   }
160   if (to->is_cpu() && buf->parent()) {
161     // MyDevice to CPU
162     return MemoryManager::CopyBuffer(buf->parent(), to);
163   }
164   return nullptr;
165 }
166 
ViewBufferFrom(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & from)167 Result<std::shared_ptr<Buffer>> MyMemoryManager::ViewBufferFrom(
168     const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
169   if (!allow_view()) {
170     return nullptr;
171   }
172   if (from->is_cpu()) {
173     // CPU on MyDevice: wrap CPU buffer
174     return std::make_shared<MyBuffer>(shared_from_this(), buf);
175   }
176   return nullptr;
177 }
178 
ViewBufferTo(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & to)179 Result<std::shared_ptr<Buffer>> MyMemoryManager::ViewBufferTo(
180     const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
181   if (!allow_view()) {
182     return nullptr;
183   }
184   if (to->is_cpu() && buf->parent()) {
185     // MyDevice on CPU: unwrap buffer
186     return buf->parent();
187   }
188   return nullptr;
189 }
190 
191 // Like AssertBufferEqual, but doesn't call Buffer::data()
AssertMyBufferEqual(const Buffer & buffer,util::string_view expected)192 void AssertMyBufferEqual(const Buffer& buffer, util::string_view expected) {
193   ASSERT_EQ(util::string_view(buffer), expected);
194 }
195 
AssertIsCPUBuffer(const Buffer & buf)196 void AssertIsCPUBuffer(const Buffer& buf) {
197   ASSERT_TRUE(buf.is_cpu());
198   ASSERT_EQ(*buf.device(), *CPUDevice::Instance());
199 }
200 
201 class TestDevice : public ::testing::Test {
202  public:
SetUp()203   void SetUp() {
204     cpu_device_ = CPUDevice::Instance();
205     my_copy_device_ = std::make_shared<MyDevice>(kMyDeviceAllowCopy);
206     my_view_device_ = std::make_shared<MyDevice>(kMyDeviceAllowView);
207     my_other_device_ = std::make_shared<MyDevice>(kMyDeviceDisallowCopyView);
208 
209     cpu_mm_ = cpu_device_->default_memory_manager();
210     my_copy_mm_ = my_copy_device_->default_memory_manager();
211     my_view_mm_ = my_view_device_->default_memory_manager();
212     my_other_mm_ = my_other_device_->default_memory_manager();
213 
214     cpu_src_ = Buffer::FromString("some data");
215     my_copy_src_ = std::make_shared<MyBuffer>(my_copy_mm_, cpu_src_);
216     my_view_src_ = std::make_shared<MyBuffer>(my_view_mm_, cpu_src_);
217     my_other_src_ = std::make_shared<MyBuffer>(my_other_mm_, cpu_src_);
218   }
219 
220  protected:
221   std::shared_ptr<Device> cpu_device_, my_copy_device_, my_view_device_, my_other_device_;
222   std::shared_ptr<MemoryManager> cpu_mm_, my_copy_mm_, my_view_mm_, my_other_mm_;
223   std::shared_ptr<Buffer> cpu_src_, my_copy_src_, my_view_src_, my_other_src_;
224 };
225 
TEST_F(TestDevice,Basics)226 TEST_F(TestDevice, Basics) {
227   ASSERT_TRUE(cpu_device_->is_cpu());
228 
229   ASSERT_EQ(*cpu_device_, *cpu_device_);
230   ASSERT_EQ(*my_copy_device_, *my_copy_device_);
231   ASSERT_NE(*cpu_device_, *my_copy_device_);
232   ASSERT_NE(*my_copy_device_, *cpu_device_);
233 
234   ASSERT_TRUE(cpu_mm_->is_cpu());
235   ASSERT_FALSE(my_copy_mm_->is_cpu());
236   ASSERT_FALSE(my_other_mm_->is_cpu());
237 }
238 
TEST_F(TestDevice,Copy)239 TEST_F(TestDevice, Copy) {
240   // CPU-to-CPU
241   ASSERT_OK_AND_ASSIGN(auto buffer, MemoryManager::CopyBuffer(cpu_src_, cpu_mm_));
242   ASSERT_EQ(buffer->device(), cpu_device_);
243   ASSERT_TRUE(buffer->is_cpu());
244   ASSERT_NE(buffer->address(), cpu_src_->address());
245   ASSERT_NE(buffer->data(), nullptr);
246   AssertBufferEqual(*buffer, "some data");
247 
248   // CPU-to-device
249   ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(cpu_src_, my_copy_mm_));
250   ASSERT_EQ(buffer->device(), my_copy_device_);
251   ASSERT_FALSE(buffer->is_cpu());
252   ASSERT_NE(buffer->address(), cpu_src_->address());
253 #ifdef NDEBUG
254   ASSERT_EQ(buffer->data(), nullptr);
255 #endif
256   AssertMyBufferEqual(*buffer, "some data");
257 
258   // Device-to-CPU
259   ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_copy_src_, cpu_mm_));
260   ASSERT_EQ(buffer->device(), cpu_device_);
261   ASSERT_TRUE(buffer->is_cpu());
262   ASSERT_NE(buffer->address(), my_copy_src_->address());
263   ASSERT_NE(buffer->data(), nullptr);
264   AssertBufferEqual(*buffer, "some data");
265 
266   // Device-to-device with an intermediate CPU copy
267   ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_copy_src_, my_copy_mm_));
268   ASSERT_EQ(buffer->device(), my_copy_device_);
269   ASSERT_FALSE(buffer->is_cpu());
270   ASSERT_NE(buffer->address(), my_copy_src_->address());
271 #ifdef NDEBUG
272   ASSERT_EQ(buffer->data(), nullptr);
273 #endif
274   AssertMyBufferEqual(*buffer, "some data");
275 
276   // Device-to-device with an intermediate view on CPU, then a copy from CPU to device
277   ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_view_src_, my_copy_mm_));
278   ASSERT_EQ(buffer->device(), my_copy_device_);
279   ASSERT_FALSE(buffer->is_cpu());
280   ASSERT_NE(buffer->address(), my_copy_src_->address());
281 #ifdef NDEBUG
282   ASSERT_EQ(buffer->data(), nullptr);
283 #endif
284   AssertMyBufferEqual(*buffer, "some data");
285 
286   ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(cpu_src_, my_other_mm_));
287   ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(my_other_src_, cpu_mm_));
288 }
289 
TEST_F(TestDevice,View)290 TEST_F(TestDevice, View) {
291   // CPU-on-CPU
292   ASSERT_OK_AND_ASSIGN(auto buffer, MemoryManager::ViewBuffer(cpu_src_, cpu_mm_));
293   ASSERT_EQ(buffer->device(), cpu_device_);
294   ASSERT_TRUE(buffer->is_cpu());
295   ASSERT_EQ(buffer->address(), cpu_src_->address());
296   ASSERT_NE(buffer->data(), nullptr);
297   AssertBufferEqual(*buffer, "some data");
298 
299   // CPU-on-device
300   ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::ViewBuffer(cpu_src_, my_view_mm_));
301   ASSERT_EQ(buffer->device(), my_view_device_);
302   ASSERT_FALSE(buffer->is_cpu());
303   ASSERT_EQ(buffer->address(), cpu_src_->address());
304 #ifdef NDEBUG
305   ASSERT_EQ(buffer->data(), nullptr);
306 #endif
307   AssertMyBufferEqual(*buffer, "some data");
308 
309   // Device-on-CPU
310   ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::ViewBuffer(my_view_src_, cpu_mm_));
311   ASSERT_EQ(buffer->device(), cpu_device_);
312   ASSERT_TRUE(buffer->is_cpu());
313   ASSERT_EQ(buffer->address(), my_copy_src_->address());
314   ASSERT_NE(buffer->data(), nullptr);
315   AssertBufferEqual(*buffer, "some data");
316 
317   ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(cpu_src_, my_other_mm_));
318   ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(my_other_src_, cpu_mm_));
319 }
320 
TEST(TestAllocate,Basics)321 TEST(TestAllocate, Basics) {
322   ASSERT_OK_AND_ASSIGN(auto new_buffer, AllocateBuffer(1024));
323   auto mm = new_buffer->memory_manager();
324   ASSERT_TRUE(mm->is_cpu());
325   ASSERT_EQ(mm.get(), default_cpu_memory_manager().get());
326   auto cpu_mm = checked_pointer_cast<CPUMemoryManager>(mm);
327   ASSERT_EQ(cpu_mm->pool(), default_memory_pool());
328 
329   auto pool = std::make_shared<ProxyMemoryPool>(default_memory_pool());
330   ASSERT_OK_AND_ASSIGN(new_buffer, AllocateBuffer(1024, pool.get()));
331   mm = new_buffer->memory_manager();
332   ASSERT_TRUE(mm->is_cpu());
333   cpu_mm = checked_pointer_cast<CPUMemoryManager>(mm);
334   ASSERT_EQ(cpu_mm->pool(), pool.get());
335   new_buffer.reset();  // Destroy before pool
336 }
337 
TEST(TestAllocate,Bitmap)338 TEST(TestAllocate, Bitmap) {
339   ASSERT_OK_AND_ASSIGN(auto new_buffer, AllocateBitmap(100));
340   AssertIsCPUBuffer(*new_buffer);
341   EXPECT_GE(new_buffer->size(), 13);
342   EXPECT_EQ(new_buffer->capacity() % 8, 0);
343 }
344 
TEST(TestAllocate,EmptyBitmap)345 TEST(TestAllocate, EmptyBitmap) {
346   ASSERT_OK_AND_ASSIGN(auto new_buffer, AllocateEmptyBitmap(100));
347   AssertIsCPUBuffer(*new_buffer);
348   EXPECT_EQ(new_buffer->size(), 13);
349   EXPECT_EQ(new_buffer->capacity() % 8, 0);
350   EXPECT_TRUE(std::all_of(new_buffer->data(), new_buffer->data() + new_buffer->capacity(),
351                           [](int8_t byte) { return byte == 0; }));
352 }
353 
TEST(TestBuffer,FromStdString)354 TEST(TestBuffer, FromStdString) {
355   std::string val = "hello, world";
356 
357   Buffer buf(val);
358   AssertIsCPUBuffer(buf);
359   ASSERT_EQ(0, memcmp(buf.data(), val.c_str(), val.size()));
360   ASSERT_EQ(static_cast<int64_t>(val.size()), buf.size());
361 }
362 
TEST(TestBuffer,FromStdStringWithMemory)363 TEST(TestBuffer, FromStdStringWithMemory) {
364   std::string expected = "hello, world";
365   std::shared_ptr<Buffer> buf;
366 
367   {
368     std::string temp = "hello, world";
369     buf = Buffer::FromString(temp);
370     AssertIsCPUBuffer(*buf);
371     ASSERT_EQ(0, memcmp(buf->data(), temp.c_str(), temp.size()));
372     ASSERT_EQ(static_cast<int64_t>(temp.size()), buf->size());
373   }
374 
375   // Now temp goes out of scope and we check if created buffer
376   // is still valid to make sure it actually owns its space
377   ASSERT_EQ(0, memcmp(buf->data(), expected.c_str(), expected.size()));
378   ASSERT_EQ(static_cast<int64_t>(expected.size()), buf->size());
379 }
380 
TEST(TestBuffer,EqualsWithSameContent)381 TEST(TestBuffer, EqualsWithSameContent) {
382   MemoryPool* pool = default_memory_pool();
383   const int32_t bufferSize = 128 * 1024;
384   uint8_t* rawBuffer1;
385   ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer1));
386   memset(rawBuffer1, 12, bufferSize);
387   uint8_t* rawBuffer2;
388   ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer2));
389   memset(rawBuffer2, 12, bufferSize);
390   uint8_t* rawBuffer3;
391   ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer3));
392   memset(rawBuffer3, 3, bufferSize);
393 
394   Buffer buffer1(rawBuffer1, bufferSize);
395   Buffer buffer2(rawBuffer2, bufferSize);
396   Buffer buffer3(rawBuffer3, bufferSize);
397   ASSERT_TRUE(buffer1.Equals(buffer2));
398   ASSERT_FALSE(buffer1.Equals(buffer3));
399 
400   pool->Free(rawBuffer1, bufferSize);
401   pool->Free(rawBuffer2, bufferSize);
402   pool->Free(rawBuffer3, bufferSize);
403 }
404 
TEST(TestBuffer,EqualsWithSameBuffer)405 TEST(TestBuffer, EqualsWithSameBuffer) {
406   MemoryPool* pool = default_memory_pool();
407   const int32_t bufferSize = 128 * 1024;
408   uint8_t* rawBuffer;
409   ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer));
410   memset(rawBuffer, 111, bufferSize);
411 
412   Buffer buffer1(rawBuffer, bufferSize);
413   Buffer buffer2(rawBuffer, bufferSize);
414   ASSERT_TRUE(buffer1.Equals(buffer2));
415 
416   const int64_t nbytes = bufferSize / 2;
417   Buffer buffer3(rawBuffer, nbytes);
418   ASSERT_TRUE(buffer1.Equals(buffer3, nbytes));
419   ASSERT_FALSE(buffer1.Equals(buffer3, nbytes + 1));
420 
421   pool->Free(rawBuffer, bufferSize);
422 }
423 
TEST(TestBuffer,CopySlice)424 TEST(TestBuffer, CopySlice) {
425   std::string data_str = "some data to copy";
426 
427   auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
428 
429   Buffer buf(data, data_str.size());
430 
431   ASSERT_OK_AND_ASSIGN(auto out, buf.CopySlice(5, 4));
432   AssertIsCPUBuffer(*out);
433 
434   Buffer expected(data + 5, 4);
435   ASSERT_TRUE(out->Equals(expected));
436   // assert the padding is zeroed
437   std::vector<uint8_t> zeros(out->capacity() - out->size());
438   ASSERT_EQ(0, memcmp(out->data() + out->size(), zeros.data(), zeros.size()));
439 }
440 
TEST(TestBuffer,CopySliceEmpty)441 TEST(TestBuffer, CopySliceEmpty) {
442   auto buf = std::make_shared<Buffer>("");
443   ASSERT_OK_AND_ASSIGN(auto out, buf->CopySlice(0, 0));
444   AssertBufferEqual(*out, "");
445 
446   buf = std::make_shared<Buffer>("1234");
447   ASSERT_OK_AND_ASSIGN(out, buf->CopySlice(0, 0));
448   AssertBufferEqual(*out, "");
449   ASSERT_OK_AND_ASSIGN(out, buf->CopySlice(4, 0));
450   AssertBufferEqual(*out, "");
451 }
452 
TEST(TestBuffer,ToHexString)453 TEST(TestBuffer, ToHexString) {
454   const uint8_t data_array[] = "\a0hex string\xa9";
455   std::basic_string<uint8_t> data_str = data_array;
456 
457   auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
458 
459   Buffer buf(data, data_str.size());
460 
461   ASSERT_EQ(buf.ToHexString(), std::string("073068657820737472696E67A9"));
462 }
463 
TEST(TestBuffer,SliceBuffer)464 TEST(TestBuffer, SliceBuffer) {
465   std::string data_str = "some data to slice";
466 
467   auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
468 
469   auto buf = std::make_shared<Buffer>(data, data_str.size());
470 
471   std::shared_ptr<Buffer> out = SliceBuffer(buf, 5, 4);
472   AssertIsCPUBuffer(*out);
473   Buffer expected(data + 5, 4);
474   ASSERT_TRUE(out->Equals(expected));
475 
476   ASSERT_EQ(2, buf.use_count());
477 }
478 
TEST(TestMutableBuffer,Wrap)479 TEST(TestMutableBuffer, Wrap) {
480   std::vector<int32_t> values = {1, 2, 3};
481 
482   auto buf = MutableBuffer::Wrap(values.data(), values.size());
483   AssertIsCPUBuffer(*buf);
484   reinterpret_cast<int32_t*>(buf->mutable_data())[1] = 4;
485 
486   ASSERT_EQ(4, values[1]);
487 }
488 
TEST(TestBuffer,FromStringRvalue)489 TEST(TestBuffer, FromStringRvalue) {
490   std::string expected = "input data";
491 
492   std::shared_ptr<Buffer> buffer;
493   {
494     std::string data_str = "input data";
495     buffer = Buffer::FromString(std::move(data_str));
496     AssertIsCPUBuffer(*buffer);
497   }
498 
499   ASSERT_FALSE(buffer->is_mutable());
500 
501   ASSERT_EQ(0, memcmp(buffer->data(), expected.c_str(), expected.size()));
502   ASSERT_EQ(static_cast<int64_t>(expected.size()), buffer->size());
503 }
504 
TEST(TestBuffer,SliceMutableBuffer)505 TEST(TestBuffer, SliceMutableBuffer) {
506   std::string data_str = "some data to slice";
507   auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
508 
509   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, AllocateBuffer(50));
510 
511   memcpy(buffer->mutable_data(), data, data_str.size());
512 
513   std::shared_ptr<Buffer> slice = SliceMutableBuffer(buffer, 5, 10);
514   AssertIsCPUBuffer(*slice);
515   ASSERT_TRUE(slice->is_mutable());
516   ASSERT_EQ(10, slice->size());
517 
518   Buffer expected(data + 5, 10);
519   ASSERT_TRUE(slice->Equals(expected));
520 }
521 
TEST(TestBuffer,GetReader)522 TEST(TestBuffer, GetReader) {
523   const std::string data_str = "some data to read";
524   auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
525 
526   auto buf = std::make_shared<Buffer>(data, data_str.size());
527   ASSERT_OK_AND_ASSIGN(auto reader, Buffer::GetReader(buf));
528   ASSERT_OK_AND_EQ(static_cast<int64_t>(data_str.size()), reader->GetSize());
529   ASSERT_OK_AND_ASSIGN(auto read_buf, reader->ReadAt(5, 4));
530   AssertBufferEqual(*read_buf, "data");
531 }
532 
TEST(TestBuffer,GetWriter)533 TEST(TestBuffer, GetWriter) {
534   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buf, AllocateBuffer(9));
535   ASSERT_OK_AND_ASSIGN(auto writer, Buffer::GetWriter(buf));
536   ASSERT_OK(writer->Write(reinterpret_cast<const uint8_t*>("some data"), 9));
537   AssertBufferEqual(*buf, "some data");
538 
539   // Non-mutable buffer
540   buf = std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>("xxx"), 3);
541   ASSERT_RAISES(Invalid, Buffer::GetWriter(buf));
542 }
543 
544 template <typename AllocateFunction>
TestZeroSizeAllocateBuffer(MemoryPool * pool,AllocateFunction && allocate_func)545 void TestZeroSizeAllocateBuffer(MemoryPool* pool, AllocateFunction&& allocate_func) {
546   auto allocated_bytes = pool->bytes_allocated();
547   {
548     std::shared_ptr<Buffer> buffer, buffer2;
549 
550     ASSERT_OK(allocate_func(pool, 0, &buffer));
551     AssertIsCPUBuffer(*buffer);
552     ASSERT_EQ(buffer->size(), 0);
553     // Even 0-sized buffers should not have a null data pointer
554     auto data = buffer->data();
555     ASSERT_NE(data, nullptr);
556     ASSERT_EQ(buffer->mutable_data(), data);
557 
558     // As an optimization, another 0-size buffer should share the same memory "area"
559     ASSERT_OK(allocate_func(pool, 0, &buffer2));
560     AssertIsCPUBuffer(*buffer2);
561     ASSERT_EQ(buffer2->size(), 0);
562     ASSERT_EQ(buffer2->data(), data);
563 
564     ASSERT_GE(pool->bytes_allocated(), allocated_bytes);
565   }
566   ASSERT_EQ(pool->bytes_allocated(), allocated_bytes);
567 }
568 
TEST(TestAllocateBuffer,ZeroSize)569 TEST(TestAllocateBuffer, ZeroSize) {
570   MemoryPool* pool = default_memory_pool();
571   auto allocate_func = [](MemoryPool* pool, int64_t size, std::shared_ptr<Buffer>* out) {
572     return AllocateBuffer(size, pool).Value(out);
573   };
574   TestZeroSizeAllocateBuffer(pool, allocate_func);
575 }
576 
TEST(TestAllocateResizableBuffer,ZeroSize)577 TEST(TestAllocateResizableBuffer, ZeroSize) {
578   MemoryPool* pool = default_memory_pool();
579   auto allocate_func = [](MemoryPool* pool, int64_t size, std::shared_ptr<Buffer>* out) {
580     ARROW_ASSIGN_OR_RAISE(auto resizable, AllocateResizableBuffer(size, pool));
581     *out = std::move(resizable);
582     return Status::OK();
583   };
584   TestZeroSizeAllocateBuffer(pool, allocate_func);
585 }
586 
TEST(TestAllocateResizableBuffer,ZeroResize)587 TEST(TestAllocateResizableBuffer, ZeroResize) {
588   MemoryPool* pool = default_memory_pool();
589   auto allocated_bytes = pool->bytes_allocated();
590   {
591     std::shared_ptr<ResizableBuffer> buffer;
592 
593     ASSERT_OK_AND_ASSIGN(buffer, AllocateResizableBuffer(1000, pool));
594     ASSERT_EQ(buffer->size(), 1000);
595     ASSERT_NE(buffer->data(), nullptr);
596     ASSERT_EQ(buffer->mutable_data(), buffer->data());
597 
598     ASSERT_GE(pool->bytes_allocated(), allocated_bytes + 1000);
599 
600     ASSERT_OK(buffer->Resize(0));
601     ASSERT_NE(buffer->data(), nullptr);
602     ASSERT_EQ(buffer->mutable_data(), buffer->data());
603 
604     ASSERT_GE(pool->bytes_allocated(), allocated_bytes);
605     ASSERT_LT(pool->bytes_allocated(), allocated_bytes + 1000);
606   }
607   ASSERT_EQ(pool->bytes_allocated(), allocated_bytes);
608 }
609 
TEST(TestBufferBuilder,ResizeReserve)610 TEST(TestBufferBuilder, ResizeReserve) {
611   const std::string data = "some data";
612   auto data_ptr = data.c_str();
613 
614   BufferBuilder builder;
615 
616   ASSERT_OK(builder.Append(data_ptr, 9));
617   ASSERT_EQ(9, builder.length());
618 
619   ASSERT_OK(builder.Resize(128));
620   ASSERT_EQ(128, builder.capacity());
621 
622   // Do not shrink to fit
623   ASSERT_OK(builder.Resize(64, false));
624   ASSERT_EQ(128, builder.capacity());
625 
626   // Shrink to fit
627   ASSERT_OK(builder.Resize(64));
628   ASSERT_EQ(64, builder.capacity());
629 
630   // Reserve elements
631   ASSERT_OK(builder.Reserve(60));
632   ASSERT_EQ(128, builder.capacity());
633 }
634 
635 template <typename T>
636 class TypedTestBufferBuilder : public ::testing::Test {};
637 
638 using BufferBuilderElements = ::testing::Types<int16_t, uint32_t, double>;
639 
640 TYPED_TEST_SUITE(TypedTestBufferBuilder, BufferBuilderElements);
641 
TYPED_TEST(TypedTestBufferBuilder,BasicTypedBufferBuilderUsage)642 TYPED_TEST(TypedTestBufferBuilder, BasicTypedBufferBuilderUsage) {
643   TypedBufferBuilder<TypeParam> builder;
644 
645   ASSERT_OK(builder.Append(static_cast<TypeParam>(0)));
646   ASSERT_EQ(builder.length(), 1);
647   ASSERT_EQ(builder.capacity(), 64 / sizeof(TypeParam));
648 
649   constexpr int nvalues = 4;
650   TypeParam values[nvalues];
651   for (int i = 0; i != nvalues; ++i) {
652     values[i] = static_cast<TypeParam>(i);
653   }
654   ASSERT_OK(builder.Append(values, nvalues));
655   ASSERT_EQ(builder.length(), nvalues + 1);
656 
657   std::shared_ptr<Buffer> built;
658   ASSERT_OK(builder.Finish(&built));
659   AssertIsCPUBuffer(*built);
660 
661   auto data = reinterpret_cast<const TypeParam*>(built->data());
662   ASSERT_EQ(data[0], static_cast<TypeParam>(0));
663   for (auto value : values) {
664     ++data;
665     ASSERT_EQ(*data, value);
666   }
667 }
668 
TYPED_TEST(TypedTestBufferBuilder,AppendCopies)669 TYPED_TEST(TypedTestBufferBuilder, AppendCopies) {
670   TypedBufferBuilder<TypeParam> builder;
671 
672   ASSERT_OK(builder.Append(13, static_cast<TypeParam>(1)));
673   ASSERT_OK(builder.Append(17, static_cast<TypeParam>(0)));
674   ASSERT_EQ(builder.length(), 13 + 17);
675 
676   std::shared_ptr<Buffer> built;
677   ASSERT_OK(builder.Finish(&built));
678 
679   auto data = reinterpret_cast<const TypeParam*>(built->data());
680   for (int i = 0; i != 13 + 17; ++i, ++data) {
681     ASSERT_EQ(*data, static_cast<TypeParam>(i < 13)) << "index = " << i;
682   }
683 }
684 
TEST(TestBufferBuilder,BasicBoolBufferBuilderUsage)685 TEST(TestBufferBuilder, BasicBoolBufferBuilderUsage) {
686   TypedBufferBuilder<bool> builder;
687 
688   ASSERT_OK(builder.Append(false));
689   ASSERT_EQ(builder.length(), 1);
690   ASSERT_EQ(builder.capacity(), 64 * 8);
691 
692   constexpr int nvalues = 4;
693   uint8_t values[nvalues];
694   for (int i = 0; i != nvalues; ++i) {
695     values[i] = static_cast<uint8_t>(i);
696   }
697   ASSERT_OK(builder.Append(values, nvalues));
698   ASSERT_EQ(builder.length(), nvalues + 1);
699 
700   ASSERT_EQ(builder.false_count(), 2);
701 
702   std::shared_ptr<Buffer> built;
703   ASSERT_OK(builder.Finish(&built));
704   AssertIsCPUBuffer(*built);
705 
706   ASSERT_EQ(BitUtil::GetBit(built->data(), 0), false);
707   for (int i = 0; i != nvalues; ++i) {
708     ASSERT_EQ(BitUtil::GetBit(built->data(), i + 1), static_cast<bool>(values[i]));
709   }
710 
711   ASSERT_EQ(built->size(), BitUtil::BytesForBits(nvalues + 1));
712 }
713 
TEST(TestBufferBuilder,BoolBufferBuilderAppendCopies)714 TEST(TestBufferBuilder, BoolBufferBuilderAppendCopies) {
715   TypedBufferBuilder<bool> builder;
716 
717   ASSERT_OK(builder.Append(13, true));
718   ASSERT_OK(builder.Append(17, false));
719   ASSERT_EQ(builder.length(), 13 + 17);
720   ASSERT_EQ(builder.capacity(), 64 * 8);
721   ASSERT_EQ(builder.false_count(), 17);
722 
723   std::shared_ptr<Buffer> built;
724   ASSERT_OK(builder.Finish(&built));
725   AssertIsCPUBuffer(*built);
726 
727   for (int i = 0; i != 13 + 17; ++i) {
728     EXPECT_EQ(BitUtil::GetBit(built->data(), i), i < 13) << "index = " << i;
729   }
730 
731   ASSERT_EQ(built->size(), BitUtil::BytesForBits(13 + 17));
732 }
733 
734 template <typename T>
735 class TypedTestBuffer : public ::testing::Test {};
736 
737 using BufferPtrs =
738     ::testing::Types<std::shared_ptr<ResizableBuffer>, std::unique_ptr<ResizableBuffer>>;
739 
740 TYPED_TEST_SUITE(TypedTestBuffer, BufferPtrs);
741 
TYPED_TEST(TypedTestBuffer,IsMutableFlag)742 TYPED_TEST(TypedTestBuffer, IsMutableFlag) {
743   Buffer buf(nullptr, 0);
744 
745   ASSERT_FALSE(buf.is_mutable());
746 
747   MutableBuffer mbuf(nullptr, 0);
748   ASSERT_TRUE(mbuf.is_mutable());
749   AssertIsCPUBuffer(mbuf);
750 
751   TypeParam pool_buf;
752   ASSERT_OK_AND_ASSIGN(pool_buf, AllocateResizableBuffer(0));
753   ASSERT_TRUE(pool_buf->is_mutable());
754   AssertIsCPUBuffer(*pool_buf);
755 }
756 
TYPED_TEST(TypedTestBuffer,Resize)757 TYPED_TEST(TypedTestBuffer, Resize) {
758   TypeParam buf;
759   ASSERT_OK_AND_ASSIGN(buf, AllocateResizableBuffer(0));
760   AssertIsCPUBuffer(*buf);
761 
762   ASSERT_EQ(0, buf->size());
763   ASSERT_OK(buf->Resize(100));
764   ASSERT_EQ(100, buf->size());
765   ASSERT_OK(buf->Resize(200));
766   ASSERT_EQ(200, buf->size());
767 
768   // Make it smaller, too
769   ASSERT_OK(buf->Resize(50, true));
770   ASSERT_EQ(50, buf->size());
771   // We have actually shrunken in size
772   // The spec requires that capacity is a multiple of 64
773   ASSERT_EQ(64, buf->capacity());
774 
775   // Resize to a larger capacity again to test shrink_to_fit = false
776   ASSERT_OK(buf->Resize(100));
777   ASSERT_EQ(128, buf->capacity());
778   ASSERT_OK(buf->Resize(50, false));
779   ASSERT_EQ(128, buf->capacity());
780 }
781 
TYPED_TEST(TypedTestBuffer,TypedResize)782 TYPED_TEST(TypedTestBuffer, TypedResize) {
783   TypeParam buf;
784   ASSERT_OK_AND_ASSIGN(buf, AllocateResizableBuffer(0));
785 
786   ASSERT_EQ(0, buf->size());
787   ASSERT_OK(buf->template TypedResize<double>(100));
788   ASSERT_EQ(800, buf->size());
789   ASSERT_OK(buf->template TypedResize<double>(200));
790   ASSERT_EQ(1600, buf->size());
791 
792   ASSERT_OK(buf->template TypedResize<double>(50, true));
793   ASSERT_EQ(400, buf->size());
794   ASSERT_EQ(448, buf->capacity());
795 
796   ASSERT_OK(buf->template TypedResize<double>(100));
797   ASSERT_EQ(832, buf->capacity());
798   ASSERT_OK(buf->template TypedResize<double>(50, false));
799   ASSERT_EQ(832, buf->capacity());
800 }
801 
TYPED_TEST(TypedTestBuffer,ResizeOOM)802 TYPED_TEST(TypedTestBuffer, ResizeOOM) {
803 // This test doesn't play nice with AddressSanitizer
804 #ifndef ADDRESS_SANITIZER
805   // realloc fails, even though there may be no explicit limit
806   TypeParam buf;
807   ASSERT_OK_AND_ASSIGN(buf, AllocateResizableBuffer(0));
808   ASSERT_OK(buf->Resize(100));
809   int64_t to_alloc = std::min<uint64_t>(std::numeric_limits<int64_t>::max(),
810                                         std::numeric_limits<size_t>::max());
811   // subtract 63 to prevent overflow after the size is aligned
812   to_alloc -= 63;
813   ASSERT_RAISES(OutOfMemory, buf->Resize(to_alloc));
814 #endif
815 }
816 
817 }  // namespace arrow
818