1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <algorithm>
19 #include <cstdint>
20 #include <cstring>
21 #include <limits>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include <gtest/gtest.h>
28
29 #include "arrow/buffer.h"
30 #include "arrow/buffer_builder.h"
31 #include "arrow/device.h"
32 #include "arrow/io/interfaces.h"
33 #include "arrow/memory_pool.h"
34 #include "arrow/status.h"
35 #include "arrow/testing/gtest_util.h"
36 #include "arrow/util/checked_cast.h"
37
38 namespace arrow {
39
40 using internal::checked_cast;
41 using internal::checked_pointer_cast;
42
43 static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
44
45 static const int kMyDeviceAllowCopy = 1;
46 static const int kMyDeviceAllowView = 2;
47 static const int kMyDeviceDisallowCopyView = 3;
48
49 class MyDevice : public Device {
50 public:
MyDevice(int value)51 explicit MyDevice(int value) : Device(), value_(value) {}
52
type_name() const53 const char* type_name() const override { return kMyDeviceTypeName; }
54
ToString() const55 std::string ToString() const override {
56 switch (value_) {
57 case kMyDeviceAllowCopy:
58 return "MyDevice[noview]";
59 case kMyDeviceAllowView:
60 return "MyDevice[nocopy]";
61 default:
62 return "MyDevice[nocopy][noview]";
63 }
64 }
65
Equals(const Device & other) const66 bool Equals(const Device& other) const override {
67 if (other.type_name() != kMyDeviceTypeName) {
68 return false;
69 }
70 return checked_cast<const MyDevice&>(other).value_ == value_;
71 }
72
73 std::shared_ptr<MemoryManager> default_memory_manager() override;
74
value() const75 int value() const { return value_; }
76
allow_copy() const77 bool allow_copy() const { return value_ == kMyDeviceAllowCopy; }
78
allow_view() const79 bool allow_view() const { return value_ == kMyDeviceAllowView; }
80
81 protected:
82 int value_;
83 };
84
85 class MyMemoryManager : public MemoryManager {
86 public:
MyMemoryManager(std::shared_ptr<Device> device)87 explicit MyMemoryManager(std::shared_ptr<Device> device) : MemoryManager(device) {}
88
allow_copy() const89 bool allow_copy() const {
90 return checked_cast<const MyDevice&>(*device()).allow_copy();
91 }
92
allow_view() const93 bool allow_view() const {
94 return checked_cast<const MyDevice&>(*device()).allow_view();
95 }
96
GetBufferReader(std::shared_ptr<Buffer> buf)97 Result<std::shared_ptr<io::RandomAccessFile>> GetBufferReader(
98 std::shared_ptr<Buffer> buf) override {
99 return Status::NotImplemented("");
100 }
101
AllocateBuffer(int64_t size)102 Result<std::shared_ptr<Buffer>> AllocateBuffer(int64_t size) override {
103 return Status::NotImplemented("");
104 }
105
GetBufferWriter(std::shared_ptr<Buffer> buf)106 Result<std::shared_ptr<io::OutputStream>> GetBufferWriter(
107 std::shared_ptr<Buffer> buf) override {
108 return Status::NotImplemented("");
109 }
110
111 protected:
112 Result<std::shared_ptr<Buffer>> CopyBufferFrom(
113 const std::shared_ptr<Buffer>& buf,
114 const std::shared_ptr<MemoryManager>& from) override;
115 Result<std::shared_ptr<Buffer>> CopyBufferTo(
116 const std::shared_ptr<Buffer>& buf,
117 const std::shared_ptr<MemoryManager>& to) override;
118 Result<std::shared_ptr<Buffer>> ViewBufferFrom(
119 const std::shared_ptr<Buffer>& buf,
120 const std::shared_ptr<MemoryManager>& from) override;
121 Result<std::shared_ptr<Buffer>> ViewBufferTo(
122 const std::shared_ptr<Buffer>& buf,
123 const std::shared_ptr<MemoryManager>& to) override;
124 };
125
126 class MyBuffer : public Buffer {
127 public:
MyBuffer(std::shared_ptr<MemoryManager> mm,const std::shared_ptr<Buffer> & parent)128 MyBuffer(std::shared_ptr<MemoryManager> mm, const std::shared_ptr<Buffer>& parent)
129 : Buffer(parent->data(), parent->size()) {
130 parent_ = parent;
131 SetMemoryManager(mm);
132 }
133 };
134
default_memory_manager()135 std::shared_ptr<MemoryManager> MyDevice::default_memory_manager() {
136 return std::make_shared<MyMemoryManager>(shared_from_this());
137 }
138
CopyBufferFrom(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & from)139 Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferFrom(
140 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
141 if (!allow_copy()) {
142 return nullptr;
143 }
144 if (from->is_cpu()) {
145 // CPU to MyDevice:
146 // 1. CPU to CPU
147 ARROW_ASSIGN_OR_RAISE(auto dest,
148 MemoryManager::CopyBuffer(buf, default_cpu_memory_manager()));
149 // 2. Wrap CPU buffer result
150 return std::make_shared<MyBuffer>(shared_from_this(), dest);
151 }
152 return nullptr;
153 }
154
CopyBufferTo(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & to)155 Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferTo(
156 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
157 if (!allow_copy()) {
158 return nullptr;
159 }
160 if (to->is_cpu() && buf->parent()) {
161 // MyDevice to CPU
162 return MemoryManager::CopyBuffer(buf->parent(), to);
163 }
164 return nullptr;
165 }
166
ViewBufferFrom(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & from)167 Result<std::shared_ptr<Buffer>> MyMemoryManager::ViewBufferFrom(
168 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
169 if (!allow_view()) {
170 return nullptr;
171 }
172 if (from->is_cpu()) {
173 // CPU on MyDevice: wrap CPU buffer
174 return std::make_shared<MyBuffer>(shared_from_this(), buf);
175 }
176 return nullptr;
177 }
178
ViewBufferTo(const std::shared_ptr<Buffer> & buf,const std::shared_ptr<MemoryManager> & to)179 Result<std::shared_ptr<Buffer>> MyMemoryManager::ViewBufferTo(
180 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
181 if (!allow_view()) {
182 return nullptr;
183 }
184 if (to->is_cpu() && buf->parent()) {
185 // MyDevice on CPU: unwrap buffer
186 return buf->parent();
187 }
188 return nullptr;
189 }
190
191 // Like AssertBufferEqual, but doesn't call Buffer::data()
AssertMyBufferEqual(const Buffer & buffer,util::string_view expected)192 void AssertMyBufferEqual(const Buffer& buffer, util::string_view expected) {
193 ASSERT_EQ(util::string_view(buffer), expected);
194 }
195
AssertIsCPUBuffer(const Buffer & buf)196 void AssertIsCPUBuffer(const Buffer& buf) {
197 ASSERT_TRUE(buf.is_cpu());
198 ASSERT_EQ(*buf.device(), *CPUDevice::Instance());
199 }
200
201 class TestDevice : public ::testing::Test {
202 public:
SetUp()203 void SetUp() {
204 cpu_device_ = CPUDevice::Instance();
205 my_copy_device_ = std::make_shared<MyDevice>(kMyDeviceAllowCopy);
206 my_view_device_ = std::make_shared<MyDevice>(kMyDeviceAllowView);
207 my_other_device_ = std::make_shared<MyDevice>(kMyDeviceDisallowCopyView);
208
209 cpu_mm_ = cpu_device_->default_memory_manager();
210 my_copy_mm_ = my_copy_device_->default_memory_manager();
211 my_view_mm_ = my_view_device_->default_memory_manager();
212 my_other_mm_ = my_other_device_->default_memory_manager();
213
214 cpu_src_ = Buffer::FromString("some data");
215 my_copy_src_ = std::make_shared<MyBuffer>(my_copy_mm_, cpu_src_);
216 my_view_src_ = std::make_shared<MyBuffer>(my_view_mm_, cpu_src_);
217 my_other_src_ = std::make_shared<MyBuffer>(my_other_mm_, cpu_src_);
218 }
219
220 protected:
221 std::shared_ptr<Device> cpu_device_, my_copy_device_, my_view_device_, my_other_device_;
222 std::shared_ptr<MemoryManager> cpu_mm_, my_copy_mm_, my_view_mm_, my_other_mm_;
223 std::shared_ptr<Buffer> cpu_src_, my_copy_src_, my_view_src_, my_other_src_;
224 };
225
TEST_F(TestDevice,Basics)226 TEST_F(TestDevice, Basics) {
227 ASSERT_TRUE(cpu_device_->is_cpu());
228
229 ASSERT_EQ(*cpu_device_, *cpu_device_);
230 ASSERT_EQ(*my_copy_device_, *my_copy_device_);
231 ASSERT_NE(*cpu_device_, *my_copy_device_);
232 ASSERT_NE(*my_copy_device_, *cpu_device_);
233
234 ASSERT_TRUE(cpu_mm_->is_cpu());
235 ASSERT_FALSE(my_copy_mm_->is_cpu());
236 ASSERT_FALSE(my_other_mm_->is_cpu());
237 }
238
TEST_F(TestDevice,Copy)239 TEST_F(TestDevice, Copy) {
240 // CPU-to-CPU
241 ASSERT_OK_AND_ASSIGN(auto buffer, MemoryManager::CopyBuffer(cpu_src_, cpu_mm_));
242 ASSERT_EQ(buffer->device(), cpu_device_);
243 ASSERT_TRUE(buffer->is_cpu());
244 ASSERT_NE(buffer->address(), cpu_src_->address());
245 ASSERT_NE(buffer->data(), nullptr);
246 AssertBufferEqual(*buffer, "some data");
247
248 // CPU-to-device
249 ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(cpu_src_, my_copy_mm_));
250 ASSERT_EQ(buffer->device(), my_copy_device_);
251 ASSERT_FALSE(buffer->is_cpu());
252 ASSERT_NE(buffer->address(), cpu_src_->address());
253 #ifdef NDEBUG
254 ASSERT_EQ(buffer->data(), nullptr);
255 #endif
256 AssertMyBufferEqual(*buffer, "some data");
257
258 // Device-to-CPU
259 ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_copy_src_, cpu_mm_));
260 ASSERT_EQ(buffer->device(), cpu_device_);
261 ASSERT_TRUE(buffer->is_cpu());
262 ASSERT_NE(buffer->address(), my_copy_src_->address());
263 ASSERT_NE(buffer->data(), nullptr);
264 AssertBufferEqual(*buffer, "some data");
265
266 // Device-to-device with an intermediate CPU copy
267 ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_copy_src_, my_copy_mm_));
268 ASSERT_EQ(buffer->device(), my_copy_device_);
269 ASSERT_FALSE(buffer->is_cpu());
270 ASSERT_NE(buffer->address(), my_copy_src_->address());
271 #ifdef NDEBUG
272 ASSERT_EQ(buffer->data(), nullptr);
273 #endif
274 AssertMyBufferEqual(*buffer, "some data");
275
276 // Device-to-device with an intermediate view on CPU, then a copy from CPU to device
277 ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_view_src_, my_copy_mm_));
278 ASSERT_EQ(buffer->device(), my_copy_device_);
279 ASSERT_FALSE(buffer->is_cpu());
280 ASSERT_NE(buffer->address(), my_copy_src_->address());
281 #ifdef NDEBUG
282 ASSERT_EQ(buffer->data(), nullptr);
283 #endif
284 AssertMyBufferEqual(*buffer, "some data");
285
286 ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(cpu_src_, my_other_mm_));
287 ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(my_other_src_, cpu_mm_));
288 }
289
TEST_F(TestDevice,View)290 TEST_F(TestDevice, View) {
291 // CPU-on-CPU
292 ASSERT_OK_AND_ASSIGN(auto buffer, MemoryManager::ViewBuffer(cpu_src_, cpu_mm_));
293 ASSERT_EQ(buffer->device(), cpu_device_);
294 ASSERT_TRUE(buffer->is_cpu());
295 ASSERT_EQ(buffer->address(), cpu_src_->address());
296 ASSERT_NE(buffer->data(), nullptr);
297 AssertBufferEqual(*buffer, "some data");
298
299 // CPU-on-device
300 ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::ViewBuffer(cpu_src_, my_view_mm_));
301 ASSERT_EQ(buffer->device(), my_view_device_);
302 ASSERT_FALSE(buffer->is_cpu());
303 ASSERT_EQ(buffer->address(), cpu_src_->address());
304 #ifdef NDEBUG
305 ASSERT_EQ(buffer->data(), nullptr);
306 #endif
307 AssertMyBufferEqual(*buffer, "some data");
308
309 // Device-on-CPU
310 ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::ViewBuffer(my_view_src_, cpu_mm_));
311 ASSERT_EQ(buffer->device(), cpu_device_);
312 ASSERT_TRUE(buffer->is_cpu());
313 ASSERT_EQ(buffer->address(), my_copy_src_->address());
314 ASSERT_NE(buffer->data(), nullptr);
315 AssertBufferEqual(*buffer, "some data");
316
317 ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(cpu_src_, my_other_mm_));
318 ASSERT_RAISES(NotImplemented, MemoryManager::CopyBuffer(my_other_src_, cpu_mm_));
319 }
320
TEST(TestAllocate,Basics)321 TEST(TestAllocate, Basics) {
322 ASSERT_OK_AND_ASSIGN(auto new_buffer, AllocateBuffer(1024));
323 auto mm = new_buffer->memory_manager();
324 ASSERT_TRUE(mm->is_cpu());
325 ASSERT_EQ(mm.get(), default_cpu_memory_manager().get());
326 auto cpu_mm = checked_pointer_cast<CPUMemoryManager>(mm);
327 ASSERT_EQ(cpu_mm->pool(), default_memory_pool());
328
329 auto pool = std::make_shared<ProxyMemoryPool>(default_memory_pool());
330 ASSERT_OK_AND_ASSIGN(new_buffer, AllocateBuffer(1024, pool.get()));
331 mm = new_buffer->memory_manager();
332 ASSERT_TRUE(mm->is_cpu());
333 cpu_mm = checked_pointer_cast<CPUMemoryManager>(mm);
334 ASSERT_EQ(cpu_mm->pool(), pool.get());
335 new_buffer.reset(); // Destroy before pool
336 }
337
TEST(TestAllocate,Bitmap)338 TEST(TestAllocate, Bitmap) {
339 ASSERT_OK_AND_ASSIGN(auto new_buffer, AllocateBitmap(100));
340 AssertIsCPUBuffer(*new_buffer);
341 EXPECT_GE(new_buffer->size(), 13);
342 EXPECT_EQ(new_buffer->capacity() % 8, 0);
343 }
344
TEST(TestAllocate,EmptyBitmap)345 TEST(TestAllocate, EmptyBitmap) {
346 ASSERT_OK_AND_ASSIGN(auto new_buffer, AllocateEmptyBitmap(100));
347 AssertIsCPUBuffer(*new_buffer);
348 EXPECT_EQ(new_buffer->size(), 13);
349 EXPECT_EQ(new_buffer->capacity() % 8, 0);
350 EXPECT_TRUE(std::all_of(new_buffer->data(), new_buffer->data() + new_buffer->capacity(),
351 [](int8_t byte) { return byte == 0; }));
352 }
353
TEST(TestBuffer,FromStdString)354 TEST(TestBuffer, FromStdString) {
355 std::string val = "hello, world";
356
357 Buffer buf(val);
358 AssertIsCPUBuffer(buf);
359 ASSERT_EQ(0, memcmp(buf.data(), val.c_str(), val.size()));
360 ASSERT_EQ(static_cast<int64_t>(val.size()), buf.size());
361 }
362
TEST(TestBuffer,FromStdStringWithMemory)363 TEST(TestBuffer, FromStdStringWithMemory) {
364 std::string expected = "hello, world";
365 std::shared_ptr<Buffer> buf;
366
367 {
368 std::string temp = "hello, world";
369 buf = Buffer::FromString(temp);
370 AssertIsCPUBuffer(*buf);
371 ASSERT_EQ(0, memcmp(buf->data(), temp.c_str(), temp.size()));
372 ASSERT_EQ(static_cast<int64_t>(temp.size()), buf->size());
373 }
374
375 // Now temp goes out of scope and we check if created buffer
376 // is still valid to make sure it actually owns its space
377 ASSERT_EQ(0, memcmp(buf->data(), expected.c_str(), expected.size()));
378 ASSERT_EQ(static_cast<int64_t>(expected.size()), buf->size());
379 }
380
TEST(TestBuffer,EqualsWithSameContent)381 TEST(TestBuffer, EqualsWithSameContent) {
382 MemoryPool* pool = default_memory_pool();
383 const int32_t bufferSize = 128 * 1024;
384 uint8_t* rawBuffer1;
385 ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer1));
386 memset(rawBuffer1, 12, bufferSize);
387 uint8_t* rawBuffer2;
388 ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer2));
389 memset(rawBuffer2, 12, bufferSize);
390 uint8_t* rawBuffer3;
391 ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer3));
392 memset(rawBuffer3, 3, bufferSize);
393
394 Buffer buffer1(rawBuffer1, bufferSize);
395 Buffer buffer2(rawBuffer2, bufferSize);
396 Buffer buffer3(rawBuffer3, bufferSize);
397 ASSERT_TRUE(buffer1.Equals(buffer2));
398 ASSERT_FALSE(buffer1.Equals(buffer3));
399
400 pool->Free(rawBuffer1, bufferSize);
401 pool->Free(rawBuffer2, bufferSize);
402 pool->Free(rawBuffer3, bufferSize);
403 }
404
TEST(TestBuffer,EqualsWithSameBuffer)405 TEST(TestBuffer, EqualsWithSameBuffer) {
406 MemoryPool* pool = default_memory_pool();
407 const int32_t bufferSize = 128 * 1024;
408 uint8_t* rawBuffer;
409 ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer));
410 memset(rawBuffer, 111, bufferSize);
411
412 Buffer buffer1(rawBuffer, bufferSize);
413 Buffer buffer2(rawBuffer, bufferSize);
414 ASSERT_TRUE(buffer1.Equals(buffer2));
415
416 const int64_t nbytes = bufferSize / 2;
417 Buffer buffer3(rawBuffer, nbytes);
418 ASSERT_TRUE(buffer1.Equals(buffer3, nbytes));
419 ASSERT_FALSE(buffer1.Equals(buffer3, nbytes + 1));
420
421 pool->Free(rawBuffer, bufferSize);
422 }
423
TEST(TestBuffer,CopySlice)424 TEST(TestBuffer, CopySlice) {
425 std::string data_str = "some data to copy";
426
427 auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
428
429 Buffer buf(data, data_str.size());
430
431 ASSERT_OK_AND_ASSIGN(auto out, buf.CopySlice(5, 4));
432 AssertIsCPUBuffer(*out);
433
434 Buffer expected(data + 5, 4);
435 ASSERT_TRUE(out->Equals(expected));
436 // assert the padding is zeroed
437 std::vector<uint8_t> zeros(out->capacity() - out->size());
438 ASSERT_EQ(0, memcmp(out->data() + out->size(), zeros.data(), zeros.size()));
439 }
440
TEST(TestBuffer,CopySliceEmpty)441 TEST(TestBuffer, CopySliceEmpty) {
442 auto buf = std::make_shared<Buffer>("");
443 ASSERT_OK_AND_ASSIGN(auto out, buf->CopySlice(0, 0));
444 AssertBufferEqual(*out, "");
445
446 buf = std::make_shared<Buffer>("1234");
447 ASSERT_OK_AND_ASSIGN(out, buf->CopySlice(0, 0));
448 AssertBufferEqual(*out, "");
449 ASSERT_OK_AND_ASSIGN(out, buf->CopySlice(4, 0));
450 AssertBufferEqual(*out, "");
451 }
452
TEST(TestBuffer,ToHexString)453 TEST(TestBuffer, ToHexString) {
454 const uint8_t data_array[] = "\a0hex string\xa9";
455 std::basic_string<uint8_t> data_str = data_array;
456
457 auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
458
459 Buffer buf(data, data_str.size());
460
461 ASSERT_EQ(buf.ToHexString(), std::string("073068657820737472696E67A9"));
462 }
463
TEST(TestBuffer,SliceBuffer)464 TEST(TestBuffer, SliceBuffer) {
465 std::string data_str = "some data to slice";
466
467 auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
468
469 auto buf = std::make_shared<Buffer>(data, data_str.size());
470
471 std::shared_ptr<Buffer> out = SliceBuffer(buf, 5, 4);
472 AssertIsCPUBuffer(*out);
473 Buffer expected(data + 5, 4);
474 ASSERT_TRUE(out->Equals(expected));
475
476 ASSERT_EQ(2, buf.use_count());
477 }
478
TEST(TestMutableBuffer,Wrap)479 TEST(TestMutableBuffer, Wrap) {
480 std::vector<int32_t> values = {1, 2, 3};
481
482 auto buf = MutableBuffer::Wrap(values.data(), values.size());
483 AssertIsCPUBuffer(*buf);
484 reinterpret_cast<int32_t*>(buf->mutable_data())[1] = 4;
485
486 ASSERT_EQ(4, values[1]);
487 }
488
TEST(TestBuffer,FromStringRvalue)489 TEST(TestBuffer, FromStringRvalue) {
490 std::string expected = "input data";
491
492 std::shared_ptr<Buffer> buffer;
493 {
494 std::string data_str = "input data";
495 buffer = Buffer::FromString(std::move(data_str));
496 AssertIsCPUBuffer(*buffer);
497 }
498
499 ASSERT_FALSE(buffer->is_mutable());
500
501 ASSERT_EQ(0, memcmp(buffer->data(), expected.c_str(), expected.size()));
502 ASSERT_EQ(static_cast<int64_t>(expected.size()), buffer->size());
503 }
504
TEST(TestBuffer,SliceMutableBuffer)505 TEST(TestBuffer, SliceMutableBuffer) {
506 std::string data_str = "some data to slice";
507 auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
508
509 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, AllocateBuffer(50));
510
511 memcpy(buffer->mutable_data(), data, data_str.size());
512
513 std::shared_ptr<Buffer> slice = SliceMutableBuffer(buffer, 5, 10);
514 AssertIsCPUBuffer(*slice);
515 ASSERT_TRUE(slice->is_mutable());
516 ASSERT_EQ(10, slice->size());
517
518 Buffer expected(data + 5, 10);
519 ASSERT_TRUE(slice->Equals(expected));
520 }
521
TEST(TestBuffer,GetReader)522 TEST(TestBuffer, GetReader) {
523 const std::string data_str = "some data to read";
524 auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
525
526 auto buf = std::make_shared<Buffer>(data, data_str.size());
527 ASSERT_OK_AND_ASSIGN(auto reader, Buffer::GetReader(buf));
528 ASSERT_OK_AND_EQ(static_cast<int64_t>(data_str.size()), reader->GetSize());
529 ASSERT_OK_AND_ASSIGN(auto read_buf, reader->ReadAt(5, 4));
530 AssertBufferEqual(*read_buf, "data");
531 }
532
TEST(TestBuffer,GetWriter)533 TEST(TestBuffer, GetWriter) {
534 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buf, AllocateBuffer(9));
535 ASSERT_OK_AND_ASSIGN(auto writer, Buffer::GetWriter(buf));
536 ASSERT_OK(writer->Write(reinterpret_cast<const uint8_t*>("some data"), 9));
537 AssertBufferEqual(*buf, "some data");
538
539 // Non-mutable buffer
540 buf = std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>("xxx"), 3);
541 ASSERT_RAISES(Invalid, Buffer::GetWriter(buf));
542 }
543
544 template <typename AllocateFunction>
TestZeroSizeAllocateBuffer(MemoryPool * pool,AllocateFunction && allocate_func)545 void TestZeroSizeAllocateBuffer(MemoryPool* pool, AllocateFunction&& allocate_func) {
546 auto allocated_bytes = pool->bytes_allocated();
547 {
548 std::shared_ptr<Buffer> buffer, buffer2;
549
550 ASSERT_OK(allocate_func(pool, 0, &buffer));
551 AssertIsCPUBuffer(*buffer);
552 ASSERT_EQ(buffer->size(), 0);
553 // Even 0-sized buffers should not have a null data pointer
554 auto data = buffer->data();
555 ASSERT_NE(data, nullptr);
556 ASSERT_EQ(buffer->mutable_data(), data);
557
558 // As an optimization, another 0-size buffer should share the same memory "area"
559 ASSERT_OK(allocate_func(pool, 0, &buffer2));
560 AssertIsCPUBuffer(*buffer2);
561 ASSERT_EQ(buffer2->size(), 0);
562 ASSERT_EQ(buffer2->data(), data);
563
564 ASSERT_GE(pool->bytes_allocated(), allocated_bytes);
565 }
566 ASSERT_EQ(pool->bytes_allocated(), allocated_bytes);
567 }
568
TEST(TestAllocateBuffer,ZeroSize)569 TEST(TestAllocateBuffer, ZeroSize) {
570 MemoryPool* pool = default_memory_pool();
571 auto allocate_func = [](MemoryPool* pool, int64_t size, std::shared_ptr<Buffer>* out) {
572 return AllocateBuffer(size, pool).Value(out);
573 };
574 TestZeroSizeAllocateBuffer(pool, allocate_func);
575 }
576
TEST(TestAllocateResizableBuffer,ZeroSize)577 TEST(TestAllocateResizableBuffer, ZeroSize) {
578 MemoryPool* pool = default_memory_pool();
579 auto allocate_func = [](MemoryPool* pool, int64_t size, std::shared_ptr<Buffer>* out) {
580 ARROW_ASSIGN_OR_RAISE(auto resizable, AllocateResizableBuffer(size, pool));
581 *out = std::move(resizable);
582 return Status::OK();
583 };
584 TestZeroSizeAllocateBuffer(pool, allocate_func);
585 }
586
TEST(TestAllocateResizableBuffer,ZeroResize)587 TEST(TestAllocateResizableBuffer, ZeroResize) {
588 MemoryPool* pool = default_memory_pool();
589 auto allocated_bytes = pool->bytes_allocated();
590 {
591 std::shared_ptr<ResizableBuffer> buffer;
592
593 ASSERT_OK_AND_ASSIGN(buffer, AllocateResizableBuffer(1000, pool));
594 ASSERT_EQ(buffer->size(), 1000);
595 ASSERT_NE(buffer->data(), nullptr);
596 ASSERT_EQ(buffer->mutable_data(), buffer->data());
597
598 ASSERT_GE(pool->bytes_allocated(), allocated_bytes + 1000);
599
600 ASSERT_OK(buffer->Resize(0));
601 ASSERT_NE(buffer->data(), nullptr);
602 ASSERT_EQ(buffer->mutable_data(), buffer->data());
603
604 ASSERT_GE(pool->bytes_allocated(), allocated_bytes);
605 ASSERT_LT(pool->bytes_allocated(), allocated_bytes + 1000);
606 }
607 ASSERT_EQ(pool->bytes_allocated(), allocated_bytes);
608 }
609
TEST(TestBufferBuilder,ResizeReserve)610 TEST(TestBufferBuilder, ResizeReserve) {
611 const std::string data = "some data";
612 auto data_ptr = data.c_str();
613
614 BufferBuilder builder;
615
616 ASSERT_OK(builder.Append(data_ptr, 9));
617 ASSERT_EQ(9, builder.length());
618
619 ASSERT_OK(builder.Resize(128));
620 ASSERT_EQ(128, builder.capacity());
621
622 // Do not shrink to fit
623 ASSERT_OK(builder.Resize(64, false));
624 ASSERT_EQ(128, builder.capacity());
625
626 // Shrink to fit
627 ASSERT_OK(builder.Resize(64));
628 ASSERT_EQ(64, builder.capacity());
629
630 // Reserve elements
631 ASSERT_OK(builder.Reserve(60));
632 ASSERT_EQ(128, builder.capacity());
633 }
634
635 template <typename T>
636 class TypedTestBufferBuilder : public ::testing::Test {};
637
638 using BufferBuilderElements = ::testing::Types<int16_t, uint32_t, double>;
639
640 TYPED_TEST_SUITE(TypedTestBufferBuilder, BufferBuilderElements);
641
TYPED_TEST(TypedTestBufferBuilder,BasicTypedBufferBuilderUsage)642 TYPED_TEST(TypedTestBufferBuilder, BasicTypedBufferBuilderUsage) {
643 TypedBufferBuilder<TypeParam> builder;
644
645 ASSERT_OK(builder.Append(static_cast<TypeParam>(0)));
646 ASSERT_EQ(builder.length(), 1);
647 ASSERT_EQ(builder.capacity(), 64 / sizeof(TypeParam));
648
649 constexpr int nvalues = 4;
650 TypeParam values[nvalues];
651 for (int i = 0; i != nvalues; ++i) {
652 values[i] = static_cast<TypeParam>(i);
653 }
654 ASSERT_OK(builder.Append(values, nvalues));
655 ASSERT_EQ(builder.length(), nvalues + 1);
656
657 std::shared_ptr<Buffer> built;
658 ASSERT_OK(builder.Finish(&built));
659 AssertIsCPUBuffer(*built);
660
661 auto data = reinterpret_cast<const TypeParam*>(built->data());
662 ASSERT_EQ(data[0], static_cast<TypeParam>(0));
663 for (auto value : values) {
664 ++data;
665 ASSERT_EQ(*data, value);
666 }
667 }
668
TYPED_TEST(TypedTestBufferBuilder,AppendCopies)669 TYPED_TEST(TypedTestBufferBuilder, AppendCopies) {
670 TypedBufferBuilder<TypeParam> builder;
671
672 ASSERT_OK(builder.Append(13, static_cast<TypeParam>(1)));
673 ASSERT_OK(builder.Append(17, static_cast<TypeParam>(0)));
674 ASSERT_EQ(builder.length(), 13 + 17);
675
676 std::shared_ptr<Buffer> built;
677 ASSERT_OK(builder.Finish(&built));
678
679 auto data = reinterpret_cast<const TypeParam*>(built->data());
680 for (int i = 0; i != 13 + 17; ++i, ++data) {
681 ASSERT_EQ(*data, static_cast<TypeParam>(i < 13)) << "index = " << i;
682 }
683 }
684
TEST(TestBufferBuilder,BasicBoolBufferBuilderUsage)685 TEST(TestBufferBuilder, BasicBoolBufferBuilderUsage) {
686 TypedBufferBuilder<bool> builder;
687
688 ASSERT_OK(builder.Append(false));
689 ASSERT_EQ(builder.length(), 1);
690 ASSERT_EQ(builder.capacity(), 64 * 8);
691
692 constexpr int nvalues = 4;
693 uint8_t values[nvalues];
694 for (int i = 0; i != nvalues; ++i) {
695 values[i] = static_cast<uint8_t>(i);
696 }
697 ASSERT_OK(builder.Append(values, nvalues));
698 ASSERT_EQ(builder.length(), nvalues + 1);
699
700 ASSERT_EQ(builder.false_count(), 2);
701
702 std::shared_ptr<Buffer> built;
703 ASSERT_OK(builder.Finish(&built));
704 AssertIsCPUBuffer(*built);
705
706 ASSERT_EQ(BitUtil::GetBit(built->data(), 0), false);
707 for (int i = 0; i != nvalues; ++i) {
708 ASSERT_EQ(BitUtil::GetBit(built->data(), i + 1), static_cast<bool>(values[i]));
709 }
710
711 ASSERT_EQ(built->size(), BitUtil::BytesForBits(nvalues + 1));
712 }
713
TEST(TestBufferBuilder,BoolBufferBuilderAppendCopies)714 TEST(TestBufferBuilder, BoolBufferBuilderAppendCopies) {
715 TypedBufferBuilder<bool> builder;
716
717 ASSERT_OK(builder.Append(13, true));
718 ASSERT_OK(builder.Append(17, false));
719 ASSERT_EQ(builder.length(), 13 + 17);
720 ASSERT_EQ(builder.capacity(), 64 * 8);
721 ASSERT_EQ(builder.false_count(), 17);
722
723 std::shared_ptr<Buffer> built;
724 ASSERT_OK(builder.Finish(&built));
725 AssertIsCPUBuffer(*built);
726
727 for (int i = 0; i != 13 + 17; ++i) {
728 EXPECT_EQ(BitUtil::GetBit(built->data(), i), i < 13) << "index = " << i;
729 }
730
731 ASSERT_EQ(built->size(), BitUtil::BytesForBits(13 + 17));
732 }
733
734 template <typename T>
735 class TypedTestBuffer : public ::testing::Test {};
736
737 using BufferPtrs =
738 ::testing::Types<std::shared_ptr<ResizableBuffer>, std::unique_ptr<ResizableBuffer>>;
739
740 TYPED_TEST_SUITE(TypedTestBuffer, BufferPtrs);
741
TYPED_TEST(TypedTestBuffer,IsMutableFlag)742 TYPED_TEST(TypedTestBuffer, IsMutableFlag) {
743 Buffer buf(nullptr, 0);
744
745 ASSERT_FALSE(buf.is_mutable());
746
747 MutableBuffer mbuf(nullptr, 0);
748 ASSERT_TRUE(mbuf.is_mutable());
749 AssertIsCPUBuffer(mbuf);
750
751 TypeParam pool_buf;
752 ASSERT_OK_AND_ASSIGN(pool_buf, AllocateResizableBuffer(0));
753 ASSERT_TRUE(pool_buf->is_mutable());
754 AssertIsCPUBuffer(*pool_buf);
755 }
756
TYPED_TEST(TypedTestBuffer,Resize)757 TYPED_TEST(TypedTestBuffer, Resize) {
758 TypeParam buf;
759 ASSERT_OK_AND_ASSIGN(buf, AllocateResizableBuffer(0));
760 AssertIsCPUBuffer(*buf);
761
762 ASSERT_EQ(0, buf->size());
763 ASSERT_OK(buf->Resize(100));
764 ASSERT_EQ(100, buf->size());
765 ASSERT_OK(buf->Resize(200));
766 ASSERT_EQ(200, buf->size());
767
768 // Make it smaller, too
769 ASSERT_OK(buf->Resize(50, true));
770 ASSERT_EQ(50, buf->size());
771 // We have actually shrunken in size
772 // The spec requires that capacity is a multiple of 64
773 ASSERT_EQ(64, buf->capacity());
774
775 // Resize to a larger capacity again to test shrink_to_fit = false
776 ASSERT_OK(buf->Resize(100));
777 ASSERT_EQ(128, buf->capacity());
778 ASSERT_OK(buf->Resize(50, false));
779 ASSERT_EQ(128, buf->capacity());
780 }
781
TYPED_TEST(TypedTestBuffer,TypedResize)782 TYPED_TEST(TypedTestBuffer, TypedResize) {
783 TypeParam buf;
784 ASSERT_OK_AND_ASSIGN(buf, AllocateResizableBuffer(0));
785
786 ASSERT_EQ(0, buf->size());
787 ASSERT_OK(buf->template TypedResize<double>(100));
788 ASSERT_EQ(800, buf->size());
789 ASSERT_OK(buf->template TypedResize<double>(200));
790 ASSERT_EQ(1600, buf->size());
791
792 ASSERT_OK(buf->template TypedResize<double>(50, true));
793 ASSERT_EQ(400, buf->size());
794 ASSERT_EQ(448, buf->capacity());
795
796 ASSERT_OK(buf->template TypedResize<double>(100));
797 ASSERT_EQ(832, buf->capacity());
798 ASSERT_OK(buf->template TypedResize<double>(50, false));
799 ASSERT_EQ(832, buf->capacity());
800 }
801
TYPED_TEST(TypedTestBuffer,ResizeOOM)802 TYPED_TEST(TypedTestBuffer, ResizeOOM) {
803 // This test doesn't play nice with AddressSanitizer
804 #ifndef ADDRESS_SANITIZER
805 // realloc fails, even though there may be no explicit limit
806 TypeParam buf;
807 ASSERT_OK_AND_ASSIGN(buf, AllocateResizableBuffer(0));
808 ASSERT_OK(buf->Resize(100));
809 int64_t to_alloc = std::min<uint64_t>(std::numeric_limits<int64_t>::max(),
810 std::numeric_limits<size_t>::max());
811 // subtract 63 to prevent overflow after the size is aligned
812 to_alloc -= 63;
813 ASSERT_RAISES(OutOfMemory, buf->Resize(to_alloc));
814 #endif
815 }
816
817 } // namespace arrow
818