1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <assert.h>
19 #include <signal.h>
20 #include <stdlib.h>
21 #include <sys/time.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 #include <memory>
26 #include <thread>
27
28 #include <gtest/gtest.h>
29
30 #include "arrow/testing/gtest_util.h"
31 #include "arrow/util/io_util.h"
32
33 #include "plasma/client.h"
34 #include "plasma/common.h"
35 #include "plasma/plasma.h"
36 #include "plasma/protocol.h"
37 #include "plasma/test_util.h"
38
39 namespace plasma {
40
41 using arrow::internal::TemporaryDir;
42
43 std::string test_executable; // NOLINT
44
AssertObjectBufferEqual(const ObjectBuffer & object_buffer,const std::vector<uint8_t> & metadata,const std::vector<uint8_t> & data)45 void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
46 const std::vector<uint8_t>& metadata,
47 const std::vector<uint8_t>& data) {
48 arrow::AssertBufferEqual(*object_buffer.metadata, metadata);
49 arrow::AssertBufferEqual(*object_buffer.data, data);
50 }
51
52 class TestPlasmaStore : public ::testing::Test {
53 public:
54 // TODO(pcm): At the moment, stdout of the test gets mixed up with
55 // stdout of the object store. Consider changing that.
56
SetUp()57 void SetUp() {
58 ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("cli-test-"));
59 store_socket_name_ = temp_dir_->path().ToString() + "store";
60
61 std::string plasma_directory =
62 test_executable.substr(0, test_executable.find_last_of("/"));
63 std::string plasma_command =
64 plasma_directory + "/plasma-store-server -m 10000000 -s " + store_socket_name_ +
65 " 1> /dev/null 2> /dev/null & " + "echo $! > " + store_socket_name_ + ".pid";
66 PLASMA_CHECK_SYSTEM(system(plasma_command.c_str()));
67 ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
68 ARROW_CHECK_OK(client2_.Connect(store_socket_name_, ""));
69 }
70
TearDown()71 virtual void TearDown() {
72 ARROW_CHECK_OK(client_.Disconnect());
73 ARROW_CHECK_OK(client2_.Disconnect());
74 // Kill plasma_store process that we started
75 #ifdef COVERAGE_BUILD
76 // Ask plasma_store to exit gracefully and give it time to write out
77 // coverage files
78 std::string plasma_term_command =
79 "kill -TERM `cat " + store_socket_name_ + ".pid` || exit 0";
80 PLASMA_CHECK_SYSTEM(system(plasma_term_command.c_str()));
81 std::this_thread::sleep_for(std::chrono::milliseconds(200));
82 #endif
83 std::string plasma_kill_command =
84 "kill -KILL `cat " + store_socket_name_ + ".pid` || exit 0";
85 PLASMA_CHECK_SYSTEM(system(plasma_kill_command.c_str()));
86 }
87
CreateObject(PlasmaClient & client,const ObjectID & object_id,const std::vector<uint8_t> & metadata,const std::vector<uint8_t> & data,bool release=true)88 void CreateObject(PlasmaClient& client, const ObjectID& object_id,
89 const std::vector<uint8_t>& metadata,
90 const std::vector<uint8_t>& data, bool release = true) {
91 std::shared_ptr<Buffer> data_buffer;
92 ARROW_CHECK_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(),
93 &data_buffer));
94 for (size_t i = 0; i < data.size(); i++) {
95 data_buffer->mutable_data()[i] = data[i];
96 }
97 ARROW_CHECK_OK(client.Seal(object_id));
98 if (release) {
99 ARROW_CHECK_OK(client.Release(object_id));
100 }
101 }
102
103 protected:
104 PlasmaClient client_;
105 PlasmaClient client2_;
106 std::unique_ptr<TemporaryDir> temp_dir_;
107 std::string store_socket_name_;
108 };
109
TEST_F(TestPlasmaStore,NewSubscriberTest)110 TEST_F(TestPlasmaStore, NewSubscriberTest) {
111 PlasmaClient local_client, local_client2;
112
113 ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
114 ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, ""));
115
116 ObjectID object_id = random_object_id();
117
118 // Test for the object being in local Plasma store.
119 // First create object.
120 int64_t data_size = 100;
121 uint8_t metadata[] = {5};
122 int64_t metadata_size = sizeof(metadata);
123 std::shared_ptr<Buffer> data;
124 ARROW_CHECK_OK(
125 local_client.Create(object_id, data_size, metadata, metadata_size, &data));
126 ARROW_CHECK_OK(local_client.Seal(object_id));
127
128 // Test that new subscriber client2 can receive notifications about existing objects.
129 int fd = -1;
130 ARROW_CHECK_OK(local_client2.Subscribe(&fd));
131 ASSERT_GT(fd, 0);
132
133 ObjectID object_id2 = random_object_id();
134 int64_t data_size2 = 0;
135 int64_t metadata_size2 = 0;
136 ARROW_CHECK_OK(
137 local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2));
138 ASSERT_EQ(object_id, object_id2);
139 ASSERT_EQ(data_size, data_size2);
140 ASSERT_EQ(metadata_size, metadata_size2);
141
142 // Delete the object.
143 ARROW_CHECK_OK(local_client.Release(object_id));
144 ARROW_CHECK_OK(local_client.Delete(object_id));
145
146 ARROW_CHECK_OK(
147 local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2));
148 ASSERT_EQ(object_id, object_id2);
149 ASSERT_EQ(-1, data_size2);
150 ASSERT_EQ(-1, metadata_size2);
151
152 ARROW_CHECK_OK(local_client2.Disconnect());
153 ARROW_CHECK_OK(local_client.Disconnect());
154 }
155
TEST_F(TestPlasmaStore,BatchNotificationTest)156 TEST_F(TestPlasmaStore, BatchNotificationTest) {
157 PlasmaClient local_client, local_client2;
158
159 ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
160 ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, ""));
161
162 int fd = -1;
163 ARROW_CHECK_OK(local_client2.Subscribe(&fd));
164 ASSERT_GT(fd, 0);
165
166 ObjectID object_id1 = random_object_id();
167 ObjectID object_id2 = random_object_id();
168
169 std::vector<ObjectID> object_ids = {object_id1, object_id2};
170
171 std::vector<std::string> data = {"hello", "world!"};
172 std::vector<std::string> metadata = {"1", "23"};
173 ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, data, metadata));
174
175 ObjectID object_id = random_object_id();
176 int64_t data_size = 0;
177 int64_t metadata_size = 0;
178 ARROW_CHECK_OK(
179 local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size));
180 ASSERT_EQ(object_id, object_id1);
181 ASSERT_EQ(data_size, 5);
182 ASSERT_EQ(metadata_size, 1);
183
184 ARROW_CHECK_OK(
185 local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size));
186 ASSERT_EQ(object_id, object_id2);
187 ASSERT_EQ(data_size, 6);
188 ASSERT_EQ(metadata_size, 2);
189
190 ARROW_CHECK_OK(local_client2.Disconnect());
191 ARROW_CHECK_OK(local_client.Disconnect());
192 }
193
TEST_F(TestPlasmaStore,SealErrorsTest)194 TEST_F(TestPlasmaStore, SealErrorsTest) {
195 ObjectID object_id = random_object_id();
196
197 Status result = client_.Seal(object_id);
198 ASSERT_TRUE(IsPlasmaObjectNotFound(result));
199
200 // Create object.
201 std::vector<uint8_t> data(100, 0);
202 CreateObject(client_, object_id, {42}, data, false);
203
204 // Trying to seal it again.
205 result = client_.Seal(object_id);
206 ASSERT_TRUE(IsPlasmaObjectAlreadySealed(result));
207 ARROW_CHECK_OK(client_.Release(object_id));
208 }
209
TEST_F(TestPlasmaStore,SetQuotaBasicTest)210 TEST_F(TestPlasmaStore, SetQuotaBasicTest) {
211 bool has_object = false;
212 ObjectID id1 = random_object_id();
213 ObjectID id2 = random_object_id();
214
215 ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024));
216 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
217
218 // First object fits
219 CreateObject(client_, id1, {42}, big_data, true);
220 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
221 ASSERT_TRUE(has_object);
222
223 // Evicts first object
224 CreateObject(client_, id2, {42}, big_data, true);
225 ARROW_CHECK_OK(client_.Contains(id2, &has_object));
226 ASSERT_TRUE(has_object);
227 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
228 ASSERT_FALSE(has_object);
229
230 // Too big to fit in quota at all
231 std::shared_ptr<Buffer> data_buffer;
232 ASSERT_FALSE(
233 client_.Create(random_object_id(), 7 * 1024 * 1024, {}, 0, &data_buffer).ok());
234 ASSERT_TRUE(
235 client_.Create(random_object_id(), 4 * 1024 * 1024, {}, 0, &data_buffer).ok());
236 }
237
TEST_F(TestPlasmaStore,SetQuotaProvidesIsolationFromOtherClients)238 TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) {
239 bool has_object = false;
240 ObjectID id1 = random_object_id();
241 ObjectID id2 = random_object_id();
242
243 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
244
245 // First object, created without quota
246 CreateObject(client_, id1, {42}, big_data, true);
247 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
248 ASSERT_TRUE(has_object);
249
250 // Second client creates a bunch of objects
251 for (int i = 0; i < 10; i++) {
252 CreateObject(client2_, random_object_id(), {42}, big_data, true);
253 }
254
255 // First client's object is evicted
256 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
257 ASSERT_FALSE(has_object);
258
259 // Try again with quota enabled
260 ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024));
261 CreateObject(client_, id2, {42}, big_data, true);
262 ARROW_CHECK_OK(client_.Contains(id2, &has_object));
263 ASSERT_TRUE(has_object);
264
265 // Second client creates a bunch of objects
266 for (int i = 0; i < 10; i++) {
267 CreateObject(client2_, random_object_id(), {42}, big_data, true);
268 }
269
270 // First client's object is not evicted
271 ARROW_CHECK_OK(client_.Contains(id2, &has_object));
272 ASSERT_TRUE(has_object);
273 }
274
TEST_F(TestPlasmaStore,SetQuotaProtectsOtherClients)275 TEST_F(TestPlasmaStore, SetQuotaProtectsOtherClients) {
276 bool has_object = false;
277 ObjectID id1 = random_object_id();
278
279 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
280
281 // First client has no quota
282 CreateObject(client_, id1, {42}, big_data, true);
283 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
284 ASSERT_TRUE(has_object);
285
286 // Second client creates a bunch of objects under a quota
287 ARROW_CHECK_OK(client2_.SetClientOptions("client2", 5 * 1024 * 1024));
288 for (int i = 0; i < 10; i++) {
289 CreateObject(client2_, random_object_id(), {42}, big_data, true);
290 }
291
292 // First client's object is NOT evicted
293 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
294 ASSERT_TRUE(has_object);
295 }
296
TEST_F(TestPlasmaStore,SetQuotaCannotExceedSeventyPercentMemory)297 TEST_F(TestPlasmaStore, SetQuotaCannotExceedSeventyPercentMemory) {
298 ASSERT_FALSE(client_.SetClientOptions("client1", 8 * 1024 * 1024).ok());
299 ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok());
300 // cannot set quota twice
301 ASSERT_FALSE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok());
302 // cannot exceed 70% summed
303 ASSERT_FALSE(client2_.SetClientOptions("client2", 3 * 1024 * 1024).ok());
304 ASSERT_TRUE(client2_.SetClientOptions("client2", 1 * 1024 * 1024).ok());
305 }
306
TEST_F(TestPlasmaStore,SetQuotaDemotesPinnedObjectsToGlobalLRU)307 TEST_F(TestPlasmaStore, SetQuotaDemotesPinnedObjectsToGlobalLRU) {
308 bool has_object = false;
309 ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok());
310
311 ObjectID id1 = random_object_id();
312 ObjectID id2 = random_object_id();
313 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
314
315 // Quota is not enough to fit both id1 and id2, but global LRU is
316 CreateObject(client_, id1, {42}, big_data, false);
317 CreateObject(client_, id2, {42}, big_data, false);
318 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
319 ASSERT_TRUE(has_object);
320 ARROW_CHECK_OK(client_.Contains(id2, &has_object));
321 ASSERT_TRUE(has_object);
322
323 // Release both objects. Now id1 is in global LRU and id2 is in quota
324 ARROW_CHECK_OK(client_.Release(id1));
325 ARROW_CHECK_OK(client_.Release(id2));
326
327 // This flushes id1 from the object store
328 for (int i = 0; i < 10; i++) {
329 CreateObject(client2_, random_object_id(), {42}, big_data, true);
330 }
331 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
332 ASSERT_FALSE(has_object);
333 ARROW_CHECK_OK(client_.Contains(id2, &has_object));
334 ASSERT_TRUE(has_object);
335 }
336
TEST_F(TestPlasmaStore,SetQuotaDemoteDisconnectToGlobalLRU)337 TEST_F(TestPlasmaStore, SetQuotaDemoteDisconnectToGlobalLRU) {
338 bool has_object = false;
339 PlasmaClient local_client;
340 ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
341 ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024));
342
343 ObjectID id1 = random_object_id();
344 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
345
346 // First object fits
347 CreateObject(local_client, id1, {42}, big_data, true);
348 for (int i = 0; i < 10; i++) {
349 CreateObject(client_, random_object_id(), {42}, big_data, true);
350 }
351 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
352 ASSERT_TRUE(has_object);
353
354 // Object is still present after disconnect
355 ARROW_CHECK_OK(local_client.Disconnect());
356 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
357 ASSERT_TRUE(has_object);
358
359 // But is eligible for global LRU
360 for (int i = 0; i < 10; i++) {
361 CreateObject(client_, random_object_id(), {42}, big_data, true);
362 }
363 ARROW_CHECK_OK(client_.Contains(id1, &has_object));
364 ASSERT_FALSE(has_object);
365 }
366
TEST_F(TestPlasmaStore,SetQuotaCleanupObjectMetadata)367 TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) {
368 PlasmaClient local_client;
369 ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
370 ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024));
371
372 ObjectID id0 = random_object_id();
373 ObjectID id1 = random_object_id();
374 ObjectID id2 = random_object_id();
375 ObjectID id3 = random_object_id();
376 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
377 std::vector<uint8_t> small_data(1 * 1024 * 1024, 0);
378 CreateObject(local_client, id0, {42}, small_data, false);
379 CreateObject(local_client, id1, {42}, big_data, true);
380 CreateObject(local_client, id2, {42}, big_data,
381 true); // spills id0 to global, evicts id1
382 CreateObject(local_client, id3, {42}, small_data, false);
383
384 ASSERT_TRUE(client_.DebugString().find("num clients with quota: 1") !=
385 std::string::npos);
386 ASSERT_TRUE(client_.DebugString().find("quota map size: 2") != std::string::npos);
387 ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 1") !=
388 std::string::npos);
389 ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") !=
390 std::string::npos);
391 ASSERT_TRUE(client_.DebugString().find("(local) num objects: 2") != std::string::npos);
392
393 // release id0
394 ARROW_CHECK_OK(local_client.Release(id0));
395 ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 1") !=
396 std::string::npos);
397
398 // delete everything
399 ARROW_CHECK_OK(local_client.Delete(id0));
400 ARROW_CHECK_OK(local_client.Delete(id2));
401 ARROW_CHECK_OK(local_client.Delete(id3));
402 ARROW_CHECK_OK(local_client.Release(id3));
403 ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos);
404 ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") !=
405 std::string::npos);
406 ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") !=
407 std::string::npos);
408 ASSERT_TRUE(client_.DebugString().find("(local) num objects: 0") != std::string::npos);
409
410 ARROW_CHECK_OK(local_client.Disconnect());
411 int tries = 10; // wait for disconnect to complete
412 while (tries > 0 &&
413 client_.DebugString().find("num clients with quota: 0") == std::string::npos) {
414 std::this_thread::sleep_for(std::chrono::milliseconds(200));
415 tries -= 1;
416 }
417 ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") !=
418 std::string::npos);
419 ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") !=
420 std::string::npos);
421 ASSERT_TRUE(client_.DebugString().find("(global lru) used: 0%") != std::string::npos);
422 }
423
TEST_F(TestPlasmaStore,SetQuotaCleanupClientDisconnect)424 TEST_F(TestPlasmaStore, SetQuotaCleanupClientDisconnect) {
425 PlasmaClient local_client;
426 ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
427 ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024));
428
429 ObjectID id1 = random_object_id();
430 ObjectID id2 = random_object_id();
431 ObjectID id3 = random_object_id();
432 std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
433 std::vector<uint8_t> small_data(1 * 1024 * 1024, 0);
434 CreateObject(local_client, id1, {42}, big_data, true);
435 CreateObject(local_client, id2, {42}, big_data, true);
436 CreateObject(local_client, id3, {42}, small_data, false);
437
438 ARROW_CHECK_OK(local_client.Disconnect());
439 int tries = 10; // wait for disconnect to complete
440 while (tries > 0 &&
441 client_.DebugString().find("num clients with quota: 0") == std::string::npos) {
442 std::this_thread::sleep_for(std::chrono::milliseconds(200));
443 tries -= 1;
444 }
445 ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") !=
446 std::string::npos);
447 ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos);
448 ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") !=
449 std::string::npos);
450 ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 2") !=
451 std::string::npos);
452 ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") !=
453 std::string::npos);
454 ASSERT_TRUE(client_.DebugString().find("(global lru) used: 41.9431%") !=
455 std::string::npos);
456 }
457
TEST_F(TestPlasmaStore,RefreshLRUTest)458 TEST_F(TestPlasmaStore, RefreshLRUTest) {
459 bool has_object = false;
460 std::vector<ObjectID> object_ids;
461
462 for (int i = 0; i < 10; ++i) {
463 object_ids.push_back(random_object_id());
464 }
465
466 std::vector<uint8_t> small_data(1 * 1000 * 1000, 0);
467
468 // we can fit ten small objects into the store
469 for (const auto& object_id : object_ids) {
470 CreateObject(client_, object_id, {}, small_data, true);
471 ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object));
472 ASSERT_TRUE(has_object);
473 }
474
475 ObjectID id = random_object_id();
476 CreateObject(client_, id, {}, small_data, true);
477
478 // the first two objects got evicted (20% of the store)
479 ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object));
480 ASSERT_FALSE(has_object);
481
482 ARROW_CHECK_OK(client_.Contains(object_ids[1], &has_object));
483 ASSERT_FALSE(has_object);
484
485 ARROW_CHECK_OK(client_.Refresh({object_ids[2], object_ids[3]}));
486
487 id = random_object_id();
488 CreateObject(client_, id, {}, small_data, true);
489 id = random_object_id();
490 CreateObject(client_, id, {}, small_data, true);
491
492 // the refreshed objects are not evicted
493 ARROW_CHECK_OK(client_.Contains(object_ids[2], &has_object));
494 ASSERT_TRUE(has_object);
495 ARROW_CHECK_OK(client_.Contains(object_ids[3], &has_object));
496 ASSERT_TRUE(has_object);
497
498 // the next object in LRU order is evicted
499 ARROW_CHECK_OK(client_.Contains(object_ids[4], &has_object));
500 ASSERT_FALSE(has_object);
501 }
502
TEST_F(TestPlasmaStore,DeleteTest)503 TEST_F(TestPlasmaStore, DeleteTest) {
504 ObjectID object_id = random_object_id();
505
506 // Test for deleting nonexistent object.
507 Status result = client_.Delete(object_id);
508 ARROW_CHECK_OK(result);
509
510 // Test for the object being in local Plasma store.
511 // First create object.
512 int64_t data_size = 100;
513 uint8_t metadata[] = {5};
514 int64_t metadata_size = sizeof(metadata);
515 std::shared_ptr<Buffer> data;
516 ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
517 ARROW_CHECK_OK(client_.Seal(object_id));
518
519 result = client_.Delete(object_id);
520 ARROW_CHECK_OK(result);
521 bool has_object = false;
522 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
523 ASSERT_TRUE(has_object);
524
525 ARROW_CHECK_OK(client_.Release(object_id));
526 // object_id is marked as to-be-deleted, when it is not in use, it will be deleted.
527 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
528 ASSERT_FALSE(has_object);
529 ARROW_CHECK_OK(client_.Delete(object_id));
530 }
531
TEST_F(TestPlasmaStore,DeleteObjectsTest)532 TEST_F(TestPlasmaStore, DeleteObjectsTest) {
533 ObjectID object_id1 = random_object_id();
534 ObjectID object_id2 = random_object_id();
535
536 // Test for deleting nonexistent object.
537 Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
538 ARROW_CHECK_OK(result);
539 // Test for the object being in local Plasma store.
540 // First create object.
541 int64_t data_size = 100;
542 uint8_t metadata[] = {5};
543 int64_t metadata_size = sizeof(metadata);
544 std::shared_ptr<Buffer> data;
545 ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
546 ARROW_CHECK_OK(client_.Seal(object_id1));
547 ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
548 ARROW_CHECK_OK(client_.Seal(object_id2));
549 // Release the ref count of Create function.
550 ARROW_CHECK_OK(client_.Release(object_id1));
551 ARROW_CHECK_OK(client_.Release(object_id2));
552 // Increase the ref count by calling Get using client2_.
553 std::vector<ObjectBuffer> object_buffers;
554 ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
555 // Objects are still used by client2_.
556 result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
557 ARROW_CHECK_OK(result);
558 // The object is used and it should not be deleted right now.
559 bool has_object = false;
560 ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
561 ASSERT_TRUE(has_object);
562 ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
563 ASSERT_TRUE(has_object);
564 // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
565 // client2_ won't send the release request immediately because the trigger
566 // condition is not reached. The release is only added to release cache.
567 object_buffers.clear();
568 // Delete the objects.
569 result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
570 ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
571 ASSERT_FALSE(has_object);
572 ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
573 ASSERT_FALSE(has_object);
574 }
575
TEST_F(TestPlasmaStore,ContainsTest)576 TEST_F(TestPlasmaStore, ContainsTest) {
577 ObjectID object_id = random_object_id();
578
579 // Test for object nonexistence.
580 bool has_object;
581 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
582 ASSERT_FALSE(has_object);
583
584 // Test for the object being in local Plasma store.
585 // First create object.
586 std::vector<uint8_t> data(100, 0);
587 CreateObject(client_, object_id, {42}, data);
588 std::vector<ObjectBuffer> object_buffers;
589 ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
590 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
591 ASSERT_TRUE(has_object);
592 }
593
TEST_F(TestPlasmaStore,GetTest)594 TEST_F(TestPlasmaStore, GetTest) {
595 std::vector<ObjectBuffer> object_buffers;
596
597 ObjectID object_id = random_object_id();
598
599 // Test for object nonexistence.
600 ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
601 ASSERT_EQ(object_buffers.size(), 1);
602 ASSERT_FALSE(object_buffers[0].metadata);
603 ASSERT_FALSE(object_buffers[0].data);
604 EXPECT_FALSE(client_.IsInUse(object_id));
605
606 // Test for the object being in local Plasma store.
607 // First create object.
608 std::vector<uint8_t> data = {3, 5, 6, 7, 9};
609 CreateObject(client_, object_id, {42}, data);
610 EXPECT_FALSE(client_.IsInUse(object_id));
611
612 object_buffers.clear();
613 ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
614 ASSERT_EQ(object_buffers.size(), 1);
615 ASSERT_EQ(object_buffers[0].device_num, 0);
616 AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9});
617
618 // Metadata keeps object in use
619 {
620 auto metadata = object_buffers[0].metadata;
621 object_buffers.clear();
622 ::arrow::AssertBufferEqual(*metadata, std::string{42});
623 EXPECT_TRUE(client_.IsInUse(object_id));
624 }
625 // Object is automatically released
626 EXPECT_FALSE(client_.IsInUse(object_id));
627 }
628
TEST_F(TestPlasmaStore,LegacyGetTest)629 TEST_F(TestPlasmaStore, LegacyGetTest) {
630 // Test for old non-releasing Get() variant
631 ObjectID object_id = random_object_id();
632 {
633 ObjectBuffer object_buffer;
634
635 // Test for object nonexistence.
636 ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
637 ASSERT_FALSE(object_buffer.metadata);
638 ASSERT_FALSE(object_buffer.data);
639 EXPECT_FALSE(client_.IsInUse(object_id));
640
641 // First create object.
642 std::vector<uint8_t> data = {3, 5, 6, 7, 9};
643 CreateObject(client_, object_id, {42}, data);
644 EXPECT_FALSE(client_.IsInUse(object_id));
645
646 ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
647 AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9});
648 }
649 // Object needs releasing manually
650 EXPECT_TRUE(client_.IsInUse(object_id));
651 ARROW_CHECK_OK(client_.Release(object_id));
652 EXPECT_FALSE(client_.IsInUse(object_id));
653 }
654
TEST_F(TestPlasmaStore,MultipleGetTest)655 TEST_F(TestPlasmaStore, MultipleGetTest) {
656 ObjectID object_id1 = random_object_id();
657 ObjectID object_id2 = random_object_id();
658 std::vector<ObjectID> object_ids = {object_id1, object_id2};
659 std::vector<ObjectBuffer> object_buffers;
660
661 int64_t data_size = 4;
662 uint8_t metadata[] = {5};
663 int64_t metadata_size = sizeof(metadata);
664 std::shared_ptr<Buffer> data;
665 ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
666 data->mutable_data()[0] = 1;
667 ARROW_CHECK_OK(client_.Seal(object_id1));
668
669 ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
670 data->mutable_data()[0] = 2;
671 ARROW_CHECK_OK(client_.Seal(object_id2));
672
673 ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
674 ASSERT_EQ(object_buffers[0].data->data()[0], 1);
675 ASSERT_EQ(object_buffers[1].data->data()[0], 2);
676 }
677
TEST_F(TestPlasmaStore,BatchCreateTest)678 TEST_F(TestPlasmaStore, BatchCreateTest) {
679 ObjectID object_id1 = random_object_id();
680 ObjectID object_id2 = random_object_id();
681 std::vector<ObjectID> object_ids = {object_id1, object_id2};
682
683 std::vector<std::string> data = {"hello", "world"};
684 std::vector<std::string> metadata = {"1", "2"};
685
686 ARROW_CHECK_OK(client_.CreateAndSealBatch(object_ids, data, metadata));
687
688 std::vector<ObjectBuffer> object_buffers;
689
690 ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
691
692 std::string out1, out2;
693 out1.assign(reinterpret_cast<const char*>(object_buffers[0].data->data()),
694 object_buffers[0].data->size());
695 out2.assign(reinterpret_cast<const char*>(object_buffers[1].data->data()),
696 object_buffers[1].data->size());
697
698 ASSERT_STREQ(out1.c_str(), "hello");
699 ASSERT_STREQ(out2.c_str(), "world");
700 }
701
TEST_F(TestPlasmaStore,AbortTest)702 TEST_F(TestPlasmaStore, AbortTest) {
703 ObjectID object_id = random_object_id();
704 std::vector<ObjectBuffer> object_buffers;
705
706 // Test for object nonexistence.
707 ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
708 ASSERT_FALSE(object_buffers[0].data);
709
710 // Test object abort.
711 // First create object.
712 int64_t data_size = 4;
713 uint8_t metadata[] = {5};
714 int64_t metadata_size = sizeof(metadata);
715 std::shared_ptr<Buffer> data;
716 uint8_t* data_ptr;
717 ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
718 data_ptr = data->mutable_data();
719 // Write some data.
720 for (int64_t i = 0; i < data_size / 2; i++) {
721 data_ptr[i] = static_cast<uint8_t>(i % 4);
722 }
723 // Attempt to abort. Test that this fails before the first release.
724 Status status = client_.Abort(object_id);
725 ASSERT_TRUE(status.IsInvalid());
726 // Release, then abort.
727 ARROW_CHECK_OK(client_.Release(object_id));
728 EXPECT_TRUE(client_.IsInUse(object_id));
729
730 ARROW_CHECK_OK(client_.Abort(object_id));
731 EXPECT_FALSE(client_.IsInUse(object_id));
732
733 // Test for object nonexistence after the abort.
734 ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
735 ASSERT_FALSE(object_buffers[0].data);
736
737 // Create the object successfully this time.
738 CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5});
739
740 // Test that we can get the object.
741 ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
742 AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
743 }
744
TEST_F(TestPlasmaStore,OneIdCreateRepeatedlyTest)745 TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) {
746 const int64_t loop_times = 5;
747
748 ObjectID object_id = random_object_id();
749 std::vector<ObjectBuffer> object_buffers;
750
751 // Test for object nonexistence.
752 ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
753 ASSERT_FALSE(object_buffers[0].data);
754
755 int64_t data_size = 20;
756 uint8_t metadata[] = {5};
757 int64_t metadata_size = sizeof(metadata);
758
759 // Test the sequence: create -> release -> abort -> ...
760 for (int64_t i = 0; i < loop_times; i++) {
761 std::shared_ptr<Buffer> data;
762 ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
763 ARROW_CHECK_OK(client_.Release(object_id));
764 ARROW_CHECK_OK(client_.Abort(object_id));
765 }
766
767 // Test the sequence: create -> seal -> release -> delete -> ...
768 for (int64_t i = 0; i < loop_times; i++) {
769 std::shared_ptr<Buffer> data;
770 ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
771 ARROW_CHECK_OK(client_.Seal(object_id));
772 ARROW_CHECK_OK(client_.Release(object_id));
773 ARROW_CHECK_OK(client_.Delete(object_id));
774 }
775 }
776
TEST_F(TestPlasmaStore,MultipleClientTest)777 TEST_F(TestPlasmaStore, MultipleClientTest) {
778 ObjectID object_id = random_object_id();
779 std::vector<ObjectBuffer> object_buffers;
780
781 // Test for object nonexistence on the first client.
782 bool has_object;
783 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
784 ASSERT_FALSE(has_object);
785
786 // Test for the object being in local Plasma store.
787 // First create and seal object on the second client.
788 int64_t data_size = 100;
789 uint8_t metadata[] = {5};
790 int64_t metadata_size = sizeof(metadata);
791 std::shared_ptr<Buffer> data;
792 ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
793 ARROW_CHECK_OK(client2_.Seal(object_id));
794 // Test that the first client can get the object.
795 ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
796 ASSERT_TRUE(object_buffers[0].data);
797 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
798 ASSERT_TRUE(has_object);
799
800 // Test that one client disconnecting does not interfere with the other.
801 // First create object on the second client.
802 object_id = random_object_id();
803 ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
804 // Disconnect the first client.
805 ARROW_CHECK_OK(client_.Disconnect());
806 // Test that the second client can seal and get the created object.
807 ARROW_CHECK_OK(client2_.Seal(object_id));
808 ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
809 ASSERT_TRUE(object_buffers[0].data);
810 ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
811 ASSERT_TRUE(has_object);
812 }
813
TEST_F(TestPlasmaStore,ManyObjectTest)814 TEST_F(TestPlasmaStore, ManyObjectTest) {
815 // Create many objects on the first client. Seal one third, abort one third,
816 // and leave the last third unsealed.
817 std::vector<ObjectID> object_ids;
818 for (int i = 0; i < 100; i++) {
819 ObjectID object_id = random_object_id();
820 object_ids.push_back(object_id);
821
822 // Test for object nonexistence on the first client.
823 bool has_object;
824 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
825 ASSERT_FALSE(has_object);
826
827 // Test for the object being in local Plasma store.
828 // First create and seal object on the first client.
829 int64_t data_size = 100;
830 uint8_t metadata[] = {5};
831 int64_t metadata_size = sizeof(metadata);
832 std::shared_ptr<Buffer> data;
833 ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
834
835 if (i % 3 == 0) {
836 // Seal one third of the objects.
837 ARROW_CHECK_OK(client_.Seal(object_id));
838 // Test that the first client can get the object.
839 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
840 ASSERT_TRUE(has_object);
841 } else if (i % 3 == 1) {
842 // Abort one third of the objects.
843 ARROW_CHECK_OK(client_.Release(object_id));
844 ARROW_CHECK_OK(client_.Abort(object_id));
845 }
846 }
847 // Disconnect the first client. All unsealed objects should be aborted.
848 ARROW_CHECK_OK(client_.Disconnect());
849
850 // Check that the second client can query the object store for the first
851 // client's objects.
852 int i = 0;
853 for (auto const& object_id : object_ids) {
854 bool has_object;
855 ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
856 if (i % 3 == 0) {
857 // The first third should be sealed.
858 ASSERT_TRUE(has_object);
859 } else {
860 // The rest were aborted, so the object is not in the store.
861 ASSERT_FALSE(has_object);
862 }
863 i++;
864 }
865 }
866
867 #ifdef PLASMA_CUDA
868 using arrow::cuda::CudaBuffer;
869 using arrow::cuda::CudaBufferReader;
870 using arrow::cuda::CudaBufferWriter;
871
872 // actual CUDA device number + 1
873 constexpr int kGpuDeviceNumber = 1;
874
875 namespace {
876
AssertCudaRead(const std::shared_ptr<Buffer> & buffer,const std::vector<uint8_t> & expected_data)877 void AssertCudaRead(const std::shared_ptr<Buffer>& buffer,
878 const std::vector<uint8_t>& expected_data) {
879 std::shared_ptr<CudaBuffer> gpu_buffer;
880 const size_t data_size = expected_data.size();
881
882 ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(buffer));
883 ASSERT_EQ(gpu_buffer->size(), data_size);
884
885 CudaBufferReader reader(gpu_buffer);
886 std::vector<uint8_t> read_data(data_size);
887 ASSERT_OK_AND_EQ(data_size, reader.Read(data_size, read_data.data()));
888
889 for (size_t i = 0; i < data_size; i++) {
890 ASSERT_EQ(read_data[i], expected_data[i]);
891 }
892 }
893
894 } // namespace
895
TEST_F(TestPlasmaStore,GetGPUTest)896 TEST_F(TestPlasmaStore, GetGPUTest) {
897 ObjectID object_id = random_object_id();
898 std::vector<ObjectBuffer> object_buffers;
899
900 // Test for object nonexistence.
901 ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
902 ASSERT_EQ(object_buffers.size(), 1);
903 ASSERT_FALSE(object_buffers[0].data);
904
905 // Test for the object being in local Plasma store.
906 // First create object.
907 uint8_t data[] = {4, 5, 3, 1};
908 int64_t data_size = sizeof(data);
909 uint8_t metadata[] = {42};
910 int64_t metadata_size = sizeof(metadata);
911 std::shared_ptr<Buffer> data_buffer;
912 std::shared_ptr<CudaBuffer> gpu_buffer;
913 ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
914 &data_buffer, kGpuDeviceNumber));
915 ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(data_buffer));
916 CudaBufferWriter writer(gpu_buffer);
917 ARROW_CHECK_OK(writer.Write(data, data_size));
918 ARROW_CHECK_OK(client_.Seal(object_id));
919
920 object_buffers.clear();
921 ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
922 ASSERT_EQ(object_buffers.size(), 1);
923 ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber);
924 // Check data
925 AssertCudaRead(object_buffers[0].data, {4, 5, 3, 1});
926 // Check metadata
927 AssertCudaRead(object_buffers[0].metadata, {42});
928 }
929
TEST_F(TestPlasmaStore,DeleteObjectsGPUTest)930 TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
931 ObjectID object_id1 = random_object_id();
932 ObjectID object_id2 = random_object_id();
933
934 // Test for deleting nonexistent object.
935 Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
936 ARROW_CHECK_OK(result);
937 // Test for the object being in local Plasma store.
938 // First create object.
939 int64_t data_size = 100;
940 uint8_t metadata[] = {5};
941 int64_t metadata_size = sizeof(metadata);
942 std::shared_ptr<Buffer> data;
943 ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data,
944 kGpuDeviceNumber));
945 ARROW_CHECK_OK(client_.Seal(object_id1));
946 ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data,
947 kGpuDeviceNumber));
948 ARROW_CHECK_OK(client_.Seal(object_id2));
949 // Release the ref count of Create function.
950 ARROW_CHECK_OK(client_.Release(object_id1));
951 ARROW_CHECK_OK(client_.Release(object_id2));
952 // Increase the ref count by calling Get using client2_.
953 std::vector<ObjectBuffer> object_buffers;
954 ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
955 // Objects are still used by client2_.
956 result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
957 ARROW_CHECK_OK(result);
958 // The object is used and it should not be deleted right now.
959 bool has_object = false;
960 ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
961 ASSERT_TRUE(has_object);
962 ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
963 ASSERT_TRUE(has_object);
964 // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
965 // client2_ won't send the release request immediately because the trigger
966 // condition is not reached. The release is only added to release cache.
967 object_buffers.clear();
968 // Delete the objects.
969 result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
970 ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
971 ASSERT_FALSE(has_object);
972 ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
973 ASSERT_FALSE(has_object);
974 }
975
TEST_F(TestPlasmaStore,RepeatlyCreateGPUTest)976 TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) {
977 const int64_t loop_times = 100;
978 const int64_t object_num = 5;
979 const int64_t data_size = 40;
980
981 std::vector<ObjectID> object_ids;
982
983 // create new gpu objects
984 for (int64_t i = 0; i < object_num; i++) {
985 object_ids.push_back(random_object_id());
986 ObjectID& object_id = object_ids[i];
987
988 std::shared_ptr<Buffer> data;
989 ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber));
990 ARROW_CHECK_OK(client_.Seal(object_id));
991 ARROW_CHECK_OK(client_.Release(object_id));
992 }
993
994 // delete and create again
995 for (int64_t i = 0; i < loop_times; i++) {
996 ObjectID& object_id = object_ids[i % object_num];
997
998 ARROW_CHECK_OK(client_.Delete(object_id));
999
1000 std::shared_ptr<Buffer> data;
1001 ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber));
1002 ARROW_CHECK_OK(client_.Seal(object_id));
1003 ARROW_CHECK_OK(client_.Release(object_id));
1004 }
1005
1006 // delete all
1007 ARROW_CHECK_OK(client_.Delete(object_ids));
1008 }
1009
TEST_F(TestPlasmaStore,GPUBufferLifetime)1010 TEST_F(TestPlasmaStore, GPUBufferLifetime) {
1011 // ARROW-5924: GPU buffer is allowed to persist after Release()
1012 ObjectID object_id = random_object_id();
1013 const int64_t data_size = 40;
1014
1015 std::shared_ptr<Buffer> create_buff;
1016 ARROW_CHECK_OK(
1017 client_.Create(object_id, data_size, nullptr, 0, &create_buff, kGpuDeviceNumber));
1018 ARROW_CHECK_OK(client_.Seal(object_id));
1019 ARROW_CHECK_OK(client_.Release(object_id));
1020
1021 ObjectBuffer get_buff_1;
1022 ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_1));
1023 ObjectBuffer get_buff_2;
1024 ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_2));
1025 ARROW_CHECK_OK(client_.Release(object_id));
1026 ARROW_CHECK_OK(client_.Release(object_id));
1027
1028 ObjectBuffer get_buff_3;
1029 ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_3));
1030 ARROW_CHECK_OK(client_.Release(object_id));
1031
1032 ARROW_CHECK_OK(client_.Delete(object_id));
1033 }
1034
TEST_F(TestPlasmaStore,MultipleClientGPUTest)1035 TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
1036 ObjectID object_id = random_object_id();
1037 std::vector<ObjectBuffer> object_buffers;
1038
1039 // Test for object nonexistence on the first client.
1040 bool has_object;
1041 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
1042 ASSERT_FALSE(has_object);
1043
1044 // Test for the object being in local Plasma store.
1045 // First create and seal object on the second client.
1046 int64_t data_size = 100;
1047 uint8_t metadata[] = {5};
1048 int64_t metadata_size = sizeof(metadata);
1049 std::shared_ptr<Buffer> data;
1050 ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data,
1051 kGpuDeviceNumber));
1052 ARROW_CHECK_OK(client2_.Seal(object_id));
1053 // Test that the first client can get the object.
1054 ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
1055 ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
1056 ASSERT_TRUE(has_object);
1057
1058 // Test that one client disconnecting does not interfere with the other.
1059 // First create object on the second client.
1060 object_id = random_object_id();
1061 ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data,
1062 kGpuDeviceNumber));
1063 // Disconnect the first client.
1064 ARROW_CHECK_OK(client_.Disconnect());
1065 // Test that the second client can seal and get the created object.
1066 ARROW_CHECK_OK(client2_.Seal(object_id));
1067 object_buffers.clear();
1068 ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
1069 ASSERT_TRUE(has_object);
1070 ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
1071 ASSERT_EQ(object_buffers.size(), 1);
1072 ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber);
1073 AssertCudaRead(object_buffers[0].metadata, {5});
1074 }
1075
1076 #endif // PLASMA_CUDA
1077
1078 } // namespace plasma
1079
main(int argc,char ** argv)1080 int main(int argc, char** argv) {
1081 ::testing::InitGoogleTest(&argc, argv);
1082 plasma::test_executable = std::string(argv[0]);
1083 return RUN_ALL_TESTS();
1084 }
1085