1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <assert.h>
19 #include <signal.h>
20 #include <stdlib.h>
21 #include <sys/time.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24 
25 #include <memory>
26 #include <thread>
27 
28 #include <gtest/gtest.h>
29 
30 #include "arrow/testing/gtest_util.h"
31 #include "arrow/util/io_util.h"
32 
33 #include "plasma/client.h"
34 #include "plasma/common.h"
35 #include "plasma/plasma.h"
36 #include "plasma/protocol.h"
37 #include "plasma/test_util.h"
38 
39 namespace plasma {
40 
41 using arrow::internal::TemporaryDir;
42 
43 std::string test_executable;  // NOLINT
44 
AssertObjectBufferEqual(const ObjectBuffer & object_buffer,const std::vector<uint8_t> & metadata,const std::vector<uint8_t> & data)45 void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
46                              const std::vector<uint8_t>& metadata,
47                              const std::vector<uint8_t>& data) {
48   arrow::AssertBufferEqual(*object_buffer.metadata, metadata);
49   arrow::AssertBufferEqual(*object_buffer.data, data);
50 }
51 
52 class TestPlasmaStore : public ::testing::Test {
53  public:
54   // TODO(pcm): At the moment, stdout of the test gets mixed up with
55   // stdout of the object store. Consider changing that.
56 
SetUp()57   void SetUp() {
58     ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("cli-test-"));
59     store_socket_name_ = temp_dir_->path().ToString() + "store";
60 
61     std::string plasma_directory =
62         test_executable.substr(0, test_executable.find_last_of("/"));
63     std::string plasma_command =
64         plasma_directory + "/plasma-store-server -m 10000000 -s " + store_socket_name_ +
65         " 1> /dev/null 2> /dev/null & " + "echo $! > " + store_socket_name_ + ".pid";
66     PLASMA_CHECK_SYSTEM(system(plasma_command.c_str()));
67     ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
68     ARROW_CHECK_OK(client2_.Connect(store_socket_name_, ""));
69   }
70 
TearDown()71   virtual void TearDown() {
72     ARROW_CHECK_OK(client_.Disconnect());
73     ARROW_CHECK_OK(client2_.Disconnect());
74     // Kill plasma_store process that we started
75 #ifdef COVERAGE_BUILD
76     // Ask plasma_store to exit gracefully and give it time to write out
77     // coverage files
78     std::string plasma_term_command =
79         "kill -TERM `cat " + store_socket_name_ + ".pid` || exit 0";
80     PLASMA_CHECK_SYSTEM(system(plasma_term_command.c_str()));
81     std::this_thread::sleep_for(std::chrono::milliseconds(200));
82 #endif
83     std::string plasma_kill_command =
84         "kill -KILL `cat " + store_socket_name_ + ".pid` || exit 0";
85     PLASMA_CHECK_SYSTEM(system(plasma_kill_command.c_str()));
86   }
87 
CreateObject(PlasmaClient & client,const ObjectID & object_id,const std::vector<uint8_t> & metadata,const std::vector<uint8_t> & data,bool release=true)88   void CreateObject(PlasmaClient& client, const ObjectID& object_id,
89                     const std::vector<uint8_t>& metadata,
90                     const std::vector<uint8_t>& data, bool release = true) {
91     std::shared_ptr<Buffer> data_buffer;
92     ARROW_CHECK_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(),
93                                  &data_buffer));
94     for (size_t i = 0; i < data.size(); i++) {
95       data_buffer->mutable_data()[i] = data[i];
96     }
97     ARROW_CHECK_OK(client.Seal(object_id));
98     if (release) {
99       ARROW_CHECK_OK(client.Release(object_id));
100     }
101   }
102 
103  protected:
104   PlasmaClient client_;
105   PlasmaClient client2_;
106   std::unique_ptr<TemporaryDir> temp_dir_;
107   std::string store_socket_name_;
108 };
109 
TEST_F(TestPlasmaStore,NewSubscriberTest)110 TEST_F(TestPlasmaStore, NewSubscriberTest) {
111   PlasmaClient local_client, local_client2;
112 
113   ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
114   ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, ""));
115 
116   ObjectID object_id = random_object_id();
117 
118   // Test for the object being in local Plasma store.
119   // First create object.
120   int64_t data_size = 100;
121   uint8_t metadata[] = {5};
122   int64_t metadata_size = sizeof(metadata);
123   std::shared_ptr<Buffer> data;
124   ARROW_CHECK_OK(
125       local_client.Create(object_id, data_size, metadata, metadata_size, &data));
126   ARROW_CHECK_OK(local_client.Seal(object_id));
127 
128   // Test that new subscriber client2 can receive notifications about existing objects.
129   int fd = -1;
130   ARROW_CHECK_OK(local_client2.Subscribe(&fd));
131   ASSERT_GT(fd, 0);
132 
133   ObjectID object_id2 = random_object_id();
134   int64_t data_size2 = 0;
135   int64_t metadata_size2 = 0;
136   ARROW_CHECK_OK(
137       local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2));
138   ASSERT_EQ(object_id, object_id2);
139   ASSERT_EQ(data_size, data_size2);
140   ASSERT_EQ(metadata_size, metadata_size2);
141 
142   // Delete the object.
143   ARROW_CHECK_OK(local_client.Release(object_id));
144   ARROW_CHECK_OK(local_client.Delete(object_id));
145 
146   ARROW_CHECK_OK(
147       local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2));
148   ASSERT_EQ(object_id, object_id2);
149   ASSERT_EQ(-1, data_size2);
150   ASSERT_EQ(-1, metadata_size2);
151 
152   ARROW_CHECK_OK(local_client2.Disconnect());
153   ARROW_CHECK_OK(local_client.Disconnect());
154 }
155 
TEST_F(TestPlasmaStore,BatchNotificationTest)156 TEST_F(TestPlasmaStore, BatchNotificationTest) {
157   PlasmaClient local_client, local_client2;
158 
159   ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
160   ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, ""));
161 
162   int fd = -1;
163   ARROW_CHECK_OK(local_client2.Subscribe(&fd));
164   ASSERT_GT(fd, 0);
165 
166   ObjectID object_id1 = random_object_id();
167   ObjectID object_id2 = random_object_id();
168 
169   std::vector<ObjectID> object_ids = {object_id1, object_id2};
170 
171   std::vector<std::string> data = {"hello", "world!"};
172   std::vector<std::string> metadata = {"1", "23"};
173   ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, data, metadata));
174 
175   ObjectID object_id = random_object_id();
176   int64_t data_size = 0;
177   int64_t metadata_size = 0;
178   ARROW_CHECK_OK(
179       local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size));
180   ASSERT_EQ(object_id, object_id1);
181   ASSERT_EQ(data_size, 5);
182   ASSERT_EQ(metadata_size, 1);
183 
184   ARROW_CHECK_OK(
185       local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size));
186   ASSERT_EQ(object_id, object_id2);
187   ASSERT_EQ(data_size, 6);
188   ASSERT_EQ(metadata_size, 2);
189 
190   ARROW_CHECK_OK(local_client2.Disconnect());
191   ARROW_CHECK_OK(local_client.Disconnect());
192 }
193 
TEST_F(TestPlasmaStore,SealErrorsTest)194 TEST_F(TestPlasmaStore, SealErrorsTest) {
195   ObjectID object_id = random_object_id();
196 
197   Status result = client_.Seal(object_id);
198   ASSERT_TRUE(IsPlasmaObjectNotFound(result));
199 
200   // Create object.
201   std::vector<uint8_t> data(100, 0);
202   CreateObject(client_, object_id, {42}, data, false);
203 
204   // Trying to seal it again.
205   result = client_.Seal(object_id);
206   ASSERT_TRUE(IsPlasmaObjectAlreadySealed(result));
207   ARROW_CHECK_OK(client_.Release(object_id));
208 }
209 
TEST_F(TestPlasmaStore,SetQuotaBasicTest)210 TEST_F(TestPlasmaStore, SetQuotaBasicTest) {
211   bool has_object = false;
212   ObjectID id1 = random_object_id();
213   ObjectID id2 = random_object_id();
214 
215   ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024));
216   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
217 
218   // First object fits
219   CreateObject(client_, id1, {42}, big_data, true);
220   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
221   ASSERT_TRUE(has_object);
222 
223   // Evicts first object
224   CreateObject(client_, id2, {42}, big_data, true);
225   ARROW_CHECK_OK(client_.Contains(id2, &has_object));
226   ASSERT_TRUE(has_object);
227   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
228   ASSERT_FALSE(has_object);
229 
230   // Too big to fit in quota at all
231   std::shared_ptr<Buffer> data_buffer;
232   ASSERT_FALSE(
233       client_.Create(random_object_id(), 7 * 1024 * 1024, {}, 0, &data_buffer).ok());
234   ASSERT_TRUE(
235       client_.Create(random_object_id(), 4 * 1024 * 1024, {}, 0, &data_buffer).ok());
236 }
237 
TEST_F(TestPlasmaStore,SetQuotaProvidesIsolationFromOtherClients)238 TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) {
239   bool has_object = false;
240   ObjectID id1 = random_object_id();
241   ObjectID id2 = random_object_id();
242 
243   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
244 
245   // First object, created without quota
246   CreateObject(client_, id1, {42}, big_data, true);
247   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
248   ASSERT_TRUE(has_object);
249 
250   // Second client creates a bunch of objects
251   for (int i = 0; i < 10; i++) {
252     CreateObject(client2_, random_object_id(), {42}, big_data, true);
253   }
254 
255   // First client's object is evicted
256   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
257   ASSERT_FALSE(has_object);
258 
259   // Try again with quota enabled
260   ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024));
261   CreateObject(client_, id2, {42}, big_data, true);
262   ARROW_CHECK_OK(client_.Contains(id2, &has_object));
263   ASSERT_TRUE(has_object);
264 
265   // Second client creates a bunch of objects
266   for (int i = 0; i < 10; i++) {
267     CreateObject(client2_, random_object_id(), {42}, big_data, true);
268   }
269 
270   // First client's object is not evicted
271   ARROW_CHECK_OK(client_.Contains(id2, &has_object));
272   ASSERT_TRUE(has_object);
273 }
274 
TEST_F(TestPlasmaStore,SetQuotaProtectsOtherClients)275 TEST_F(TestPlasmaStore, SetQuotaProtectsOtherClients) {
276   bool has_object = false;
277   ObjectID id1 = random_object_id();
278 
279   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
280 
281   // First client has no quota
282   CreateObject(client_, id1, {42}, big_data, true);
283   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
284   ASSERT_TRUE(has_object);
285 
286   // Second client creates a bunch of objects under a quota
287   ARROW_CHECK_OK(client2_.SetClientOptions("client2", 5 * 1024 * 1024));
288   for (int i = 0; i < 10; i++) {
289     CreateObject(client2_, random_object_id(), {42}, big_data, true);
290   }
291 
292   // First client's object is NOT evicted
293   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
294   ASSERT_TRUE(has_object);
295 }
296 
TEST_F(TestPlasmaStore,SetQuotaCannotExceedSeventyPercentMemory)297 TEST_F(TestPlasmaStore, SetQuotaCannotExceedSeventyPercentMemory) {
298   ASSERT_FALSE(client_.SetClientOptions("client1", 8 * 1024 * 1024).ok());
299   ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok());
300   // cannot set quota twice
301   ASSERT_FALSE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok());
302   // cannot exceed 70% summed
303   ASSERT_FALSE(client2_.SetClientOptions("client2", 3 * 1024 * 1024).ok());
304   ASSERT_TRUE(client2_.SetClientOptions("client2", 1 * 1024 * 1024).ok());
305 }
306 
TEST_F(TestPlasmaStore,SetQuotaDemotesPinnedObjectsToGlobalLRU)307 TEST_F(TestPlasmaStore, SetQuotaDemotesPinnedObjectsToGlobalLRU) {
308   bool has_object = false;
309   ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok());
310 
311   ObjectID id1 = random_object_id();
312   ObjectID id2 = random_object_id();
313   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
314 
315   // Quota is not enough to fit both id1 and id2, but global LRU is
316   CreateObject(client_, id1, {42}, big_data, false);
317   CreateObject(client_, id2, {42}, big_data, false);
318   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
319   ASSERT_TRUE(has_object);
320   ARROW_CHECK_OK(client_.Contains(id2, &has_object));
321   ASSERT_TRUE(has_object);
322 
323   // Release both objects. Now id1 is in global LRU and id2 is in quota
324   ARROW_CHECK_OK(client_.Release(id1));
325   ARROW_CHECK_OK(client_.Release(id2));
326 
327   // This flushes id1 from the object store
328   for (int i = 0; i < 10; i++) {
329     CreateObject(client2_, random_object_id(), {42}, big_data, true);
330   }
331   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
332   ASSERT_FALSE(has_object);
333   ARROW_CHECK_OK(client_.Contains(id2, &has_object));
334   ASSERT_TRUE(has_object);
335 }
336 
TEST_F(TestPlasmaStore,SetQuotaDemoteDisconnectToGlobalLRU)337 TEST_F(TestPlasmaStore, SetQuotaDemoteDisconnectToGlobalLRU) {
338   bool has_object = false;
339   PlasmaClient local_client;
340   ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
341   ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024));
342 
343   ObjectID id1 = random_object_id();
344   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
345 
346   // First object fits
347   CreateObject(local_client, id1, {42}, big_data, true);
348   for (int i = 0; i < 10; i++) {
349     CreateObject(client_, random_object_id(), {42}, big_data, true);
350   }
351   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
352   ASSERT_TRUE(has_object);
353 
354   // Object is still present after disconnect
355   ARROW_CHECK_OK(local_client.Disconnect());
356   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
357   ASSERT_TRUE(has_object);
358 
359   // But is eligible for global LRU
360   for (int i = 0; i < 10; i++) {
361     CreateObject(client_, random_object_id(), {42}, big_data, true);
362   }
363   ARROW_CHECK_OK(client_.Contains(id1, &has_object));
364   ASSERT_FALSE(has_object);
365 }
366 
TEST_F(TestPlasmaStore,SetQuotaCleanupObjectMetadata)367 TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) {
368   PlasmaClient local_client;
369   ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
370   ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024));
371 
372   ObjectID id0 = random_object_id();
373   ObjectID id1 = random_object_id();
374   ObjectID id2 = random_object_id();
375   ObjectID id3 = random_object_id();
376   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
377   std::vector<uint8_t> small_data(1 * 1024 * 1024, 0);
378   CreateObject(local_client, id0, {42}, small_data, false);
379   CreateObject(local_client, id1, {42}, big_data, true);
380   CreateObject(local_client, id2, {42}, big_data,
381                true);  // spills id0 to global, evicts id1
382   CreateObject(local_client, id3, {42}, small_data, false);
383 
384   ASSERT_TRUE(client_.DebugString().find("num clients with quota: 1") !=
385               std::string::npos);
386   ASSERT_TRUE(client_.DebugString().find("quota map size: 2") != std::string::npos);
387   ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 1") !=
388               std::string::npos);
389   ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") !=
390               std::string::npos);
391   ASSERT_TRUE(client_.DebugString().find("(local) num objects: 2") != std::string::npos);
392 
393   // release id0
394   ARROW_CHECK_OK(local_client.Release(id0));
395   ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 1") !=
396               std::string::npos);
397 
398   // delete everything
399   ARROW_CHECK_OK(local_client.Delete(id0));
400   ARROW_CHECK_OK(local_client.Delete(id2));
401   ARROW_CHECK_OK(local_client.Delete(id3));
402   ARROW_CHECK_OK(local_client.Release(id3));
403   ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos);
404   ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") !=
405               std::string::npos);
406   ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") !=
407               std::string::npos);
408   ASSERT_TRUE(client_.DebugString().find("(local) num objects: 0") != std::string::npos);
409 
410   ARROW_CHECK_OK(local_client.Disconnect());
411   int tries = 10;  // wait for disconnect to complete
412   while (tries > 0 &&
413          client_.DebugString().find("num clients with quota: 0") == std::string::npos) {
414     std::this_thread::sleep_for(std::chrono::milliseconds(200));
415     tries -= 1;
416   }
417   ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") !=
418               std::string::npos);
419   ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") !=
420               std::string::npos);
421   ASSERT_TRUE(client_.DebugString().find("(global lru) used: 0%") != std::string::npos);
422 }
423 
TEST_F(TestPlasmaStore,SetQuotaCleanupClientDisconnect)424 TEST_F(TestPlasmaStore, SetQuotaCleanupClientDisconnect) {
425   PlasmaClient local_client;
426   ARROW_CHECK_OK(local_client.Connect(store_socket_name_, ""));
427   ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024));
428 
429   ObjectID id1 = random_object_id();
430   ObjectID id2 = random_object_id();
431   ObjectID id3 = random_object_id();
432   std::vector<uint8_t> big_data(3 * 1024 * 1024, 0);
433   std::vector<uint8_t> small_data(1 * 1024 * 1024, 0);
434   CreateObject(local_client, id1, {42}, big_data, true);
435   CreateObject(local_client, id2, {42}, big_data, true);
436   CreateObject(local_client, id3, {42}, small_data, false);
437 
438   ARROW_CHECK_OK(local_client.Disconnect());
439   int tries = 10;  // wait for disconnect to complete
440   while (tries > 0 &&
441          client_.DebugString().find("num clients with quota: 0") == std::string::npos) {
442     std::this_thread::sleep_for(std::chrono::milliseconds(200));
443     tries -= 1;
444   }
445   ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") !=
446               std::string::npos);
447   ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos);
448   ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") !=
449               std::string::npos);
450   ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 2") !=
451               std::string::npos);
452   ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") !=
453               std::string::npos);
454   ASSERT_TRUE(client_.DebugString().find("(global lru) used: 41.9431%") !=
455               std::string::npos);
456 }
457 
TEST_F(TestPlasmaStore,RefreshLRUTest)458 TEST_F(TestPlasmaStore, RefreshLRUTest) {
459   bool has_object = false;
460   std::vector<ObjectID> object_ids;
461 
462   for (int i = 0; i < 10; ++i) {
463     object_ids.push_back(random_object_id());
464   }
465 
466   std::vector<uint8_t> small_data(1 * 1000 * 1000, 0);
467 
468   // we can fit ten small objects into the store
469   for (const auto& object_id : object_ids) {
470     CreateObject(client_, object_id, {}, small_data, true);
471     ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object));
472     ASSERT_TRUE(has_object);
473   }
474 
475   ObjectID id = random_object_id();
476   CreateObject(client_, id, {}, small_data, true);
477 
478   // the first two objects got evicted (20% of the store)
479   ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object));
480   ASSERT_FALSE(has_object);
481 
482   ARROW_CHECK_OK(client_.Contains(object_ids[1], &has_object));
483   ASSERT_FALSE(has_object);
484 
485   ARROW_CHECK_OK(client_.Refresh({object_ids[2], object_ids[3]}));
486 
487   id = random_object_id();
488   CreateObject(client_, id, {}, small_data, true);
489   id = random_object_id();
490   CreateObject(client_, id, {}, small_data, true);
491 
492   // the refreshed objects are not evicted
493   ARROW_CHECK_OK(client_.Contains(object_ids[2], &has_object));
494   ASSERT_TRUE(has_object);
495   ARROW_CHECK_OK(client_.Contains(object_ids[3], &has_object));
496   ASSERT_TRUE(has_object);
497 
498   // the next object in LRU order is evicted
499   ARROW_CHECK_OK(client_.Contains(object_ids[4], &has_object));
500   ASSERT_FALSE(has_object);
501 }
502 
TEST_F(TestPlasmaStore,DeleteTest)503 TEST_F(TestPlasmaStore, DeleteTest) {
504   ObjectID object_id = random_object_id();
505 
506   // Test for deleting nonexistent object.
507   Status result = client_.Delete(object_id);
508   ARROW_CHECK_OK(result);
509 
510   // Test for the object being in local Plasma store.
511   // First create object.
512   int64_t data_size = 100;
513   uint8_t metadata[] = {5};
514   int64_t metadata_size = sizeof(metadata);
515   std::shared_ptr<Buffer> data;
516   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
517   ARROW_CHECK_OK(client_.Seal(object_id));
518 
519   result = client_.Delete(object_id);
520   ARROW_CHECK_OK(result);
521   bool has_object = false;
522   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
523   ASSERT_TRUE(has_object);
524 
525   ARROW_CHECK_OK(client_.Release(object_id));
526   // object_id is marked as to-be-deleted, when it is not in use, it will be deleted.
527   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
528   ASSERT_FALSE(has_object);
529   ARROW_CHECK_OK(client_.Delete(object_id));
530 }
531 
TEST_F(TestPlasmaStore,DeleteObjectsTest)532 TEST_F(TestPlasmaStore, DeleteObjectsTest) {
533   ObjectID object_id1 = random_object_id();
534   ObjectID object_id2 = random_object_id();
535 
536   // Test for deleting nonexistent object.
537   Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
538   ARROW_CHECK_OK(result);
539   // Test for the object being in local Plasma store.
540   // First create object.
541   int64_t data_size = 100;
542   uint8_t metadata[] = {5};
543   int64_t metadata_size = sizeof(metadata);
544   std::shared_ptr<Buffer> data;
545   ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
546   ARROW_CHECK_OK(client_.Seal(object_id1));
547   ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
548   ARROW_CHECK_OK(client_.Seal(object_id2));
549   // Release the ref count of Create function.
550   ARROW_CHECK_OK(client_.Release(object_id1));
551   ARROW_CHECK_OK(client_.Release(object_id2));
552   // Increase the ref count by calling Get using client2_.
553   std::vector<ObjectBuffer> object_buffers;
554   ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
555   // Objects are still used by client2_.
556   result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
557   ARROW_CHECK_OK(result);
558   // The object is used and it should not be deleted right now.
559   bool has_object = false;
560   ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
561   ASSERT_TRUE(has_object);
562   ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
563   ASSERT_TRUE(has_object);
564   // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
565   // client2_ won't send the release request immediately because the trigger
566   // condition is not reached. The release is only added to release cache.
567   object_buffers.clear();
568   // Delete the objects.
569   result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
570   ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
571   ASSERT_FALSE(has_object);
572   ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
573   ASSERT_FALSE(has_object);
574 }
575 
TEST_F(TestPlasmaStore,ContainsTest)576 TEST_F(TestPlasmaStore, ContainsTest) {
577   ObjectID object_id = random_object_id();
578 
579   // Test for object nonexistence.
580   bool has_object;
581   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
582   ASSERT_FALSE(has_object);
583 
584   // Test for the object being in local Plasma store.
585   // First create object.
586   std::vector<uint8_t> data(100, 0);
587   CreateObject(client_, object_id, {42}, data);
588   std::vector<ObjectBuffer> object_buffers;
589   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
590   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
591   ASSERT_TRUE(has_object);
592 }
593 
TEST_F(TestPlasmaStore,GetTest)594 TEST_F(TestPlasmaStore, GetTest) {
595   std::vector<ObjectBuffer> object_buffers;
596 
597   ObjectID object_id = random_object_id();
598 
599   // Test for object nonexistence.
600   ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
601   ASSERT_EQ(object_buffers.size(), 1);
602   ASSERT_FALSE(object_buffers[0].metadata);
603   ASSERT_FALSE(object_buffers[0].data);
604   EXPECT_FALSE(client_.IsInUse(object_id));
605 
606   // Test for the object being in local Plasma store.
607   // First create object.
608   std::vector<uint8_t> data = {3, 5, 6, 7, 9};
609   CreateObject(client_, object_id, {42}, data);
610   EXPECT_FALSE(client_.IsInUse(object_id));
611 
612   object_buffers.clear();
613   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
614   ASSERT_EQ(object_buffers.size(), 1);
615   ASSERT_EQ(object_buffers[0].device_num, 0);
616   AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9});
617 
618   // Metadata keeps object in use
619   {
620     auto metadata = object_buffers[0].metadata;
621     object_buffers.clear();
622     ::arrow::AssertBufferEqual(*metadata, std::string{42});
623     EXPECT_TRUE(client_.IsInUse(object_id));
624   }
625   // Object is automatically released
626   EXPECT_FALSE(client_.IsInUse(object_id));
627 }
628 
TEST_F(TestPlasmaStore,LegacyGetTest)629 TEST_F(TestPlasmaStore, LegacyGetTest) {
630   // Test for old non-releasing Get() variant
631   ObjectID object_id = random_object_id();
632   {
633     ObjectBuffer object_buffer;
634 
635     // Test for object nonexistence.
636     ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
637     ASSERT_FALSE(object_buffer.metadata);
638     ASSERT_FALSE(object_buffer.data);
639     EXPECT_FALSE(client_.IsInUse(object_id));
640 
641     // First create object.
642     std::vector<uint8_t> data = {3, 5, 6, 7, 9};
643     CreateObject(client_, object_id, {42}, data);
644     EXPECT_FALSE(client_.IsInUse(object_id));
645 
646     ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
647     AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9});
648   }
649   // Object needs releasing manually
650   EXPECT_TRUE(client_.IsInUse(object_id));
651   ARROW_CHECK_OK(client_.Release(object_id));
652   EXPECT_FALSE(client_.IsInUse(object_id));
653 }
654 
TEST_F(TestPlasmaStore,MultipleGetTest)655 TEST_F(TestPlasmaStore, MultipleGetTest) {
656   ObjectID object_id1 = random_object_id();
657   ObjectID object_id2 = random_object_id();
658   std::vector<ObjectID> object_ids = {object_id1, object_id2};
659   std::vector<ObjectBuffer> object_buffers;
660 
661   int64_t data_size = 4;
662   uint8_t metadata[] = {5};
663   int64_t metadata_size = sizeof(metadata);
664   std::shared_ptr<Buffer> data;
665   ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
666   data->mutable_data()[0] = 1;
667   ARROW_CHECK_OK(client_.Seal(object_id1));
668 
669   ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
670   data->mutable_data()[0] = 2;
671   ARROW_CHECK_OK(client_.Seal(object_id2));
672 
673   ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
674   ASSERT_EQ(object_buffers[0].data->data()[0], 1);
675   ASSERT_EQ(object_buffers[1].data->data()[0], 2);
676 }
677 
TEST_F(TestPlasmaStore,BatchCreateTest)678 TEST_F(TestPlasmaStore, BatchCreateTest) {
679   ObjectID object_id1 = random_object_id();
680   ObjectID object_id2 = random_object_id();
681   std::vector<ObjectID> object_ids = {object_id1, object_id2};
682 
683   std::vector<std::string> data = {"hello", "world"};
684   std::vector<std::string> metadata = {"1", "2"};
685 
686   ARROW_CHECK_OK(client_.CreateAndSealBatch(object_ids, data, metadata));
687 
688   std::vector<ObjectBuffer> object_buffers;
689 
690   ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
691 
692   std::string out1, out2;
693   out1.assign(reinterpret_cast<const char*>(object_buffers[0].data->data()),
694               object_buffers[0].data->size());
695   out2.assign(reinterpret_cast<const char*>(object_buffers[1].data->data()),
696               object_buffers[1].data->size());
697 
698   ASSERT_STREQ(out1.c_str(), "hello");
699   ASSERT_STREQ(out2.c_str(), "world");
700 }
701 
TEST_F(TestPlasmaStore,AbortTest)702 TEST_F(TestPlasmaStore, AbortTest) {
703   ObjectID object_id = random_object_id();
704   std::vector<ObjectBuffer> object_buffers;
705 
706   // Test for object nonexistence.
707   ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
708   ASSERT_FALSE(object_buffers[0].data);
709 
710   // Test object abort.
711   // First create object.
712   int64_t data_size = 4;
713   uint8_t metadata[] = {5};
714   int64_t metadata_size = sizeof(metadata);
715   std::shared_ptr<Buffer> data;
716   uint8_t* data_ptr;
717   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
718   data_ptr = data->mutable_data();
719   // Write some data.
720   for (int64_t i = 0; i < data_size / 2; i++) {
721     data_ptr[i] = static_cast<uint8_t>(i % 4);
722   }
723   // Attempt to abort. Test that this fails before the first release.
724   Status status = client_.Abort(object_id);
725   ASSERT_TRUE(status.IsInvalid());
726   // Release, then abort.
727   ARROW_CHECK_OK(client_.Release(object_id));
728   EXPECT_TRUE(client_.IsInUse(object_id));
729 
730   ARROW_CHECK_OK(client_.Abort(object_id));
731   EXPECT_FALSE(client_.IsInUse(object_id));
732 
733   // Test for object nonexistence after the abort.
734   ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
735   ASSERT_FALSE(object_buffers[0].data);
736 
737   // Create the object successfully this time.
738   CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5});
739 
740   // Test that we can get the object.
741   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
742   AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
743 }
744 
TEST_F(TestPlasmaStore,OneIdCreateRepeatedlyTest)745 TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) {
746   const int64_t loop_times = 5;
747 
748   ObjectID object_id = random_object_id();
749   std::vector<ObjectBuffer> object_buffers;
750 
751   // Test for object nonexistence.
752   ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
753   ASSERT_FALSE(object_buffers[0].data);
754 
755   int64_t data_size = 20;
756   uint8_t metadata[] = {5};
757   int64_t metadata_size = sizeof(metadata);
758 
759   // Test the sequence: create -> release -> abort -> ...
760   for (int64_t i = 0; i < loop_times; i++) {
761     std::shared_ptr<Buffer> data;
762     ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
763     ARROW_CHECK_OK(client_.Release(object_id));
764     ARROW_CHECK_OK(client_.Abort(object_id));
765   }
766 
767   // Test the sequence: create -> seal -> release -> delete -> ...
768   for (int64_t i = 0; i < loop_times; i++) {
769     std::shared_ptr<Buffer> data;
770     ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
771     ARROW_CHECK_OK(client_.Seal(object_id));
772     ARROW_CHECK_OK(client_.Release(object_id));
773     ARROW_CHECK_OK(client_.Delete(object_id));
774   }
775 }
776 
TEST_F(TestPlasmaStore,MultipleClientTest)777 TEST_F(TestPlasmaStore, MultipleClientTest) {
778   ObjectID object_id = random_object_id();
779   std::vector<ObjectBuffer> object_buffers;
780 
781   // Test for object nonexistence on the first client.
782   bool has_object;
783   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
784   ASSERT_FALSE(has_object);
785 
786   // Test for the object being in local Plasma store.
787   // First create and seal object on the second client.
788   int64_t data_size = 100;
789   uint8_t metadata[] = {5};
790   int64_t metadata_size = sizeof(metadata);
791   std::shared_ptr<Buffer> data;
792   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
793   ARROW_CHECK_OK(client2_.Seal(object_id));
794   // Test that the first client can get the object.
795   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
796   ASSERT_TRUE(object_buffers[0].data);
797   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
798   ASSERT_TRUE(has_object);
799 
800   // Test that one client disconnecting does not interfere with the other.
801   // First create object on the second client.
802   object_id = random_object_id();
803   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
804   // Disconnect the first client.
805   ARROW_CHECK_OK(client_.Disconnect());
806   // Test that the second client can seal and get the created object.
807   ARROW_CHECK_OK(client2_.Seal(object_id));
808   ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
809   ASSERT_TRUE(object_buffers[0].data);
810   ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
811   ASSERT_TRUE(has_object);
812 }
813 
TEST_F(TestPlasmaStore,ManyObjectTest)814 TEST_F(TestPlasmaStore, ManyObjectTest) {
815   // Create many objects on the first client. Seal one third, abort one third,
816   // and leave the last third unsealed.
817   std::vector<ObjectID> object_ids;
818   for (int i = 0; i < 100; i++) {
819     ObjectID object_id = random_object_id();
820     object_ids.push_back(object_id);
821 
822     // Test for object nonexistence on the first client.
823     bool has_object;
824     ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
825     ASSERT_FALSE(has_object);
826 
827     // Test for the object being in local Plasma store.
828     // First create and seal object on the first client.
829     int64_t data_size = 100;
830     uint8_t metadata[] = {5};
831     int64_t metadata_size = sizeof(metadata);
832     std::shared_ptr<Buffer> data;
833     ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
834 
835     if (i % 3 == 0) {
836       // Seal one third of the objects.
837       ARROW_CHECK_OK(client_.Seal(object_id));
838       // Test that the first client can get the object.
839       ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
840       ASSERT_TRUE(has_object);
841     } else if (i % 3 == 1) {
842       // Abort one third of the objects.
843       ARROW_CHECK_OK(client_.Release(object_id));
844       ARROW_CHECK_OK(client_.Abort(object_id));
845     }
846   }
847   // Disconnect the first client. All unsealed objects should be aborted.
848   ARROW_CHECK_OK(client_.Disconnect());
849 
850   // Check that the second client can query the object store for the first
851   // client's objects.
852   int i = 0;
853   for (auto const& object_id : object_ids) {
854     bool has_object;
855     ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
856     if (i % 3 == 0) {
857       // The first third should be sealed.
858       ASSERT_TRUE(has_object);
859     } else {
860       // The rest were aborted, so the object is not in the store.
861       ASSERT_FALSE(has_object);
862     }
863     i++;
864   }
865 }
866 
867 #ifdef PLASMA_CUDA
868 using arrow::cuda::CudaBuffer;
869 using arrow::cuda::CudaBufferReader;
870 using arrow::cuda::CudaBufferWriter;
871 
872 // actual CUDA device number + 1
873 constexpr int kGpuDeviceNumber = 1;
874 
875 namespace {
876 
AssertCudaRead(const std::shared_ptr<Buffer> & buffer,const std::vector<uint8_t> & expected_data)877 void AssertCudaRead(const std::shared_ptr<Buffer>& buffer,
878                     const std::vector<uint8_t>& expected_data) {
879   std::shared_ptr<CudaBuffer> gpu_buffer;
880   const size_t data_size = expected_data.size();
881 
882   ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(buffer));
883   ASSERT_EQ(gpu_buffer->size(), data_size);
884 
885   CudaBufferReader reader(gpu_buffer);
886   std::vector<uint8_t> read_data(data_size);
887   ASSERT_OK_AND_EQ(data_size, reader.Read(data_size, read_data.data()));
888 
889   for (size_t i = 0; i < data_size; i++) {
890     ASSERT_EQ(read_data[i], expected_data[i]);
891   }
892 }
893 
894 }  // namespace
895 
TEST_F(TestPlasmaStore,GetGPUTest)896 TEST_F(TestPlasmaStore, GetGPUTest) {
897   ObjectID object_id = random_object_id();
898   std::vector<ObjectBuffer> object_buffers;
899 
900   // Test for object nonexistence.
901   ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
902   ASSERT_EQ(object_buffers.size(), 1);
903   ASSERT_FALSE(object_buffers[0].data);
904 
905   // Test for the object being in local Plasma store.
906   // First create object.
907   uint8_t data[] = {4, 5, 3, 1};
908   int64_t data_size = sizeof(data);
909   uint8_t metadata[] = {42};
910   int64_t metadata_size = sizeof(metadata);
911   std::shared_ptr<Buffer> data_buffer;
912   std::shared_ptr<CudaBuffer> gpu_buffer;
913   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
914                                 &data_buffer, kGpuDeviceNumber));
915   ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(data_buffer));
916   CudaBufferWriter writer(gpu_buffer);
917   ARROW_CHECK_OK(writer.Write(data, data_size));
918   ARROW_CHECK_OK(client_.Seal(object_id));
919 
920   object_buffers.clear();
921   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
922   ASSERT_EQ(object_buffers.size(), 1);
923   ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber);
924   // Check data
925   AssertCudaRead(object_buffers[0].data, {4, 5, 3, 1});
926   // Check metadata
927   AssertCudaRead(object_buffers[0].metadata, {42});
928 }
929 
TEST_F(TestPlasmaStore,DeleteObjectsGPUTest)930 TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
931   ObjectID object_id1 = random_object_id();
932   ObjectID object_id2 = random_object_id();
933 
934   // Test for deleting nonexistent object.
935   Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
936   ARROW_CHECK_OK(result);
937   // Test for the object being in local Plasma store.
938   // First create object.
939   int64_t data_size = 100;
940   uint8_t metadata[] = {5};
941   int64_t metadata_size = sizeof(metadata);
942   std::shared_ptr<Buffer> data;
943   ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data,
944                                 kGpuDeviceNumber));
945   ARROW_CHECK_OK(client_.Seal(object_id1));
946   ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data,
947                                 kGpuDeviceNumber));
948   ARROW_CHECK_OK(client_.Seal(object_id2));
949   // Release the ref count of Create function.
950   ARROW_CHECK_OK(client_.Release(object_id1));
951   ARROW_CHECK_OK(client_.Release(object_id2));
952   // Increase the ref count by calling Get using client2_.
953   std::vector<ObjectBuffer> object_buffers;
954   ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
955   // Objects are still used by client2_.
956   result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
957   ARROW_CHECK_OK(result);
958   // The object is used and it should not be deleted right now.
959   bool has_object = false;
960   ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
961   ASSERT_TRUE(has_object);
962   ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
963   ASSERT_TRUE(has_object);
964   // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
965   // client2_ won't send the release request immediately because the trigger
966   // condition is not reached. The release is only added to release cache.
967   object_buffers.clear();
968   // Delete the objects.
969   result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
970   ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
971   ASSERT_FALSE(has_object);
972   ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
973   ASSERT_FALSE(has_object);
974 }
975 
TEST_F(TestPlasmaStore,RepeatlyCreateGPUTest)976 TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) {
977   const int64_t loop_times = 100;
978   const int64_t object_num = 5;
979   const int64_t data_size = 40;
980 
981   std::vector<ObjectID> object_ids;
982 
983   // create new gpu objects
984   for (int64_t i = 0; i < object_num; i++) {
985     object_ids.push_back(random_object_id());
986     ObjectID& object_id = object_ids[i];
987 
988     std::shared_ptr<Buffer> data;
989     ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber));
990     ARROW_CHECK_OK(client_.Seal(object_id));
991     ARROW_CHECK_OK(client_.Release(object_id));
992   }
993 
994   // delete and create again
995   for (int64_t i = 0; i < loop_times; i++) {
996     ObjectID& object_id = object_ids[i % object_num];
997 
998     ARROW_CHECK_OK(client_.Delete(object_id));
999 
1000     std::shared_ptr<Buffer> data;
1001     ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber));
1002     ARROW_CHECK_OK(client_.Seal(object_id));
1003     ARROW_CHECK_OK(client_.Release(object_id));
1004   }
1005 
1006   // delete all
1007   ARROW_CHECK_OK(client_.Delete(object_ids));
1008 }
1009 
TEST_F(TestPlasmaStore,GPUBufferLifetime)1010 TEST_F(TestPlasmaStore, GPUBufferLifetime) {
1011   // ARROW-5924: GPU buffer is allowed to persist after Release()
1012   ObjectID object_id = random_object_id();
1013   const int64_t data_size = 40;
1014 
1015   std::shared_ptr<Buffer> create_buff;
1016   ARROW_CHECK_OK(
1017       client_.Create(object_id, data_size, nullptr, 0, &create_buff, kGpuDeviceNumber));
1018   ARROW_CHECK_OK(client_.Seal(object_id));
1019   ARROW_CHECK_OK(client_.Release(object_id));
1020 
1021   ObjectBuffer get_buff_1;
1022   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_1));
1023   ObjectBuffer get_buff_2;
1024   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_2));
1025   ARROW_CHECK_OK(client_.Release(object_id));
1026   ARROW_CHECK_OK(client_.Release(object_id));
1027 
1028   ObjectBuffer get_buff_3;
1029   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_3));
1030   ARROW_CHECK_OK(client_.Release(object_id));
1031 
1032   ARROW_CHECK_OK(client_.Delete(object_id));
1033 }
1034 
TEST_F(TestPlasmaStore,MultipleClientGPUTest)1035 TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
1036   ObjectID object_id = random_object_id();
1037   std::vector<ObjectBuffer> object_buffers;
1038 
1039   // Test for object nonexistence on the first client.
1040   bool has_object;
1041   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
1042   ASSERT_FALSE(has_object);
1043 
1044   // Test for the object being in local Plasma store.
1045   // First create and seal object on the second client.
1046   int64_t data_size = 100;
1047   uint8_t metadata[] = {5};
1048   int64_t metadata_size = sizeof(metadata);
1049   std::shared_ptr<Buffer> data;
1050   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data,
1051                                  kGpuDeviceNumber));
1052   ARROW_CHECK_OK(client2_.Seal(object_id));
1053   // Test that the first client can get the object.
1054   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
1055   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
1056   ASSERT_TRUE(has_object);
1057 
1058   // Test that one client disconnecting does not interfere with the other.
1059   // First create object on the second client.
1060   object_id = random_object_id();
1061   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data,
1062                                  kGpuDeviceNumber));
1063   // Disconnect the first client.
1064   ARROW_CHECK_OK(client_.Disconnect());
1065   // Test that the second client can seal and get the created object.
1066   ARROW_CHECK_OK(client2_.Seal(object_id));
1067   object_buffers.clear();
1068   ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
1069   ASSERT_TRUE(has_object);
1070   ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
1071   ASSERT_EQ(object_buffers.size(), 1);
1072   ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber);
1073   AssertCudaRead(object_buffers[0].metadata, {5});
1074 }
1075 
1076 #endif  // PLASMA_CUDA
1077 
1078 }  // namespace plasma
1079 
main(int argc,char ** argv)1080 int main(int argc, char** argv) {
1081   ::testing::InitGoogleTest(&argc, argv);
1082   plasma::test_executable = std::string(argv[0]);
1083   return RUN_ALL_TESTS();
1084 }
1085