1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifdef HAVE_CONFIG_H
21 #  include <config.h>
22 #endif
23 
24 #include <arrow-glib/buffer.hpp>
25 #include <arrow-glib/error.hpp>
26 
27 #ifdef HAVE_ARROW_CUDA
28 #  include <arrow-cuda-glib/cuda.hpp>
29 #endif
30 
31 #include <plasma-glib/client.hpp>
32 #include <plasma-glib/object.hpp>
33 
34 G_BEGIN_DECLS
35 
36 /**
37  * SECTION: client
38  * @section_id: client-classes
39  * @title: Client related classes
40  * @include: plasma-glib/plasma-glib.h
41  *
42  * #GPlasmaClientOptions is a class for customizing plasma store
43  * connection.
44  *
45  * #GPlasmaClientCreateOptions is a class for customizing object creation.
46  *
47  * #GPlasmaClient is a class for an interface with a plasma store.
48  *
49  * Since: 0.12.0
50  */
51 
52 typedef struct GPlasmaClientCreatePrivate_ {
53   gint n_retries;
54 } GPlasmaClientOptionsPrivate;
55 
56 enum {
57   PROP_N_RETRIES = 1
58 };
59 
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions,gplasma_client_options,G_TYPE_OBJECT)60 G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions,
61                            gplasma_client_options,
62                            G_TYPE_OBJECT)
63 
64 #define GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object)      \
65   static_cast<GPlasmaClientOptionsPrivate *>(           \
66     gplasma_client_options_get_instance_private(        \
67       GPLASMA_CLIENT_OPTIONS(object)))
68 
69 static void
70 gplasma_client_options_set_property(GObject *object,
71                                     guint prop_id,
72                                     const GValue *value,
73                                     GParamSpec *pspec)
74 {
75   auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
76 
77   switch (prop_id) {
78   case PROP_N_RETRIES:
79     priv->n_retries = g_value_get_int(value);
80     break;
81   default:
82     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
83     break;
84   }
85 }
86 
87 static void
gplasma_client_options_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)88 gplasma_client_options_get_property(GObject *object,
89                                     guint prop_id,
90                                     GValue *value,
91                                     GParamSpec *pspec)
92 {
93   auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
94 
95   switch (prop_id) {
96   case PROP_N_RETRIES:
97     g_value_set_int(value, priv->n_retries);
98     break;
99   default:
100     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
101     break;
102   }
103 }
104 
105 static void
gplasma_client_options_init(GPlasmaClientOptions * object)106 gplasma_client_options_init(GPlasmaClientOptions *object)
107 {
108 }
109 
110 static void
gplasma_client_options_class_init(GPlasmaClientOptionsClass * klass)111 gplasma_client_options_class_init(GPlasmaClientOptionsClass *klass)
112 {
113   auto gobject_class = G_OBJECT_CLASS(klass);
114 
115   gobject_class->set_property = gplasma_client_options_set_property;
116   gobject_class->get_property = gplasma_client_options_get_property;
117 
118   GParamSpec *spec;
119   spec = g_param_spec_int("n-retries",
120                           "N retries",
121                           "The number of retries to connect plasma store. "
122                           "-1 means that the system default value is used.",
123                           -1,
124                           G_MAXINT,
125                           -1,
126                           static_cast<GParamFlags>(G_PARAM_READWRITE |
127                                                    G_PARAM_CONSTRUCT));
128   g_object_class_install_property(gobject_class, PROP_N_RETRIES, spec);
129 }
130 
131 /**
132  * gplasma_client_options_new:
133  *
134  * Returns: A newly created #GPlasmaClientOptions.
135  *
136  * Since: 0.12.0
137  */
138 GPlasmaClientOptions *
gplasma_client_options_new(void)139 gplasma_client_options_new(void)
140 {
141   auto options = g_object_new(GPLASMA_TYPE_CLIENT_OPTIONS,
142                               NULL);
143   return GPLASMA_CLIENT_OPTIONS(options);
144 }
145 
146 /**
147  * gplasma_client_options_set_n_retries:
148  * @options: A #GPlasmaClientOptions.
149  * @n_retries: The number of retires on connect.
150  *
151  * Since: 0.12.0
152  */
153 void
gplasma_client_options_set_n_retries(GPlasmaClientOptions * options,gint n_retries)154 gplasma_client_options_set_n_retries(GPlasmaClientOptions *options,
155                                      gint n_retries)
156 {
157   auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
158   priv->n_retries = n_retries;
159 }
160 
161 /**
162  * gplasma_client_options_get_n_retries:
163  * @options: A #GPlasmaClientOptions.
164  *
165  * Returns: The number of retries on connect.
166  *
167  * Since: 0.12.0
168  */
169 gint
gplasma_client_options_get_n_retries(GPlasmaClientOptions * options)170 gplasma_client_options_get_n_retries(GPlasmaClientOptions *options)
171 {
172   auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
173   return priv->n_retries;
174 }
175 
176 
177 typedef struct GPlasmaClientCreateOptionsPrivate_ {
178   guint8 *metadata;
179   gsize metadata_size;
180   gint gpu_device;
181 } GPlasmaClientCreateOptionsPrivate;
182 
183 enum {
184   PROP_GPU_DEVICE = 1
185 };
186 
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions,gplasma_client_create_options,G_TYPE_OBJECT)187 G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions,
188                            gplasma_client_create_options,
189                            G_TYPE_OBJECT)
190 
191 #define GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object)         \
192   static_cast<GPlasmaClientCreateOptionsPrivate *>(               \
193     gplasma_client_create_options_get_instance_private(           \
194       GPLASMA_CLIENT_CREATE_OPTIONS(object)))
195 
196 static void
197 gplasma_client_create_options_set_property(GObject *object,
198                                            guint prop_id,
199                                            const GValue *value,
200                                            GParamSpec *pspec)
201 {
202   auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
203 
204   switch (prop_id) {
205   case PROP_GPU_DEVICE:
206     priv->gpu_device = g_value_get_int(value);
207     break;
208   default:
209     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
210     break;
211   }
212 }
213 
214 static void
gplasma_client_create_options_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)215 gplasma_client_create_options_get_property(GObject *object,
216                                            guint prop_id,
217                                            GValue *value,
218                                            GParamSpec *pspec)
219 {
220   auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
221 
222   switch (prop_id) {
223   case PROP_GPU_DEVICE:
224     g_value_set_int(value, priv->gpu_device);
225     break;
226   default:
227     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
228     break;
229   }
230 }
231 
232 static void
gplasma_client_create_options_init(GPlasmaClientCreateOptions * object)233 gplasma_client_create_options_init(GPlasmaClientCreateOptions *object)
234 {
235 }
236 
237 static void
gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass * klass)238 gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass *klass)
239 {
240   auto gobject_class = G_OBJECT_CLASS(klass);
241 
242   gobject_class->set_property = gplasma_client_create_options_set_property;
243   gobject_class->get_property = gplasma_client_create_options_get_property;
244 
245   GParamSpec *spec;
246   spec = g_param_spec_int("gpu-device",
247                           "GPU device",
248                           "The GPU device number. -1 means GPU isn't used.",
249                           -1,
250                           G_MAXINT,
251                           -1,
252                           static_cast<GParamFlags>(G_PARAM_READWRITE |
253                                                    G_PARAM_CONSTRUCT));
254   g_object_class_install_property(gobject_class, PROP_GPU_DEVICE, spec);
255 }
256 
257 /**
258  * gplasma_client_create_options_new:
259  *
260  * Returns: A newly created #GPlasmaClientCreateOptions.
261  *
262  * Since: 0.12.0
263  */
264 GPlasmaClientCreateOptions *
gplasma_client_create_options_new(void)265 gplasma_client_create_options_new(void)
266 {
267   auto options = g_object_new(GPLASMA_TYPE_CLIENT_CREATE_OPTIONS,
268                               NULL);
269   return GPLASMA_CLIENT_CREATE_OPTIONS(options);
270 }
271 
272 /**
273  * gplasma_client_create_options_set_metadata:
274  * @options: A #GPlasmaClientCreateOptions.
275  * @metadata: (nullable) (array length=size): The metadata of a created object.
276  * @size: The number of bytes of the metadata.
277  *
278  * Since: 0.12.0
279  */
280 void
gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions * options,const guint8 * metadata,gsize size)281 gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions *options,
282                                            const guint8 *metadata,
283                                            gsize size)
284 {
285   auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
286   if (priv->metadata) {
287     g_free(priv->metadata);
288   }
289   priv->metadata = static_cast<guint8 *>(g_memdup(metadata, size));
290   priv->metadata_size = size;
291 }
292 
293 /**
294  * gplasma_client_create_options_get_metadata:
295  * @options: A #GPlasmaClientCreateOptions.
296  * @size: (nullable) (out): The number of bytes of the metadata.
297  *
298  * Returns: (nullable) (array length=size): The metadata of a created object.
299  *
300  * Since: 0.12.0
301  */
302 const guint8 *
gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions * options,gsize * size)303 gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions *options,
304                                            gsize *size)
305 {
306   auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
307   if (size) {
308     *size = priv->metadata_size;
309   }
310   return priv->metadata;
311 }
312 
313 
314 typedef struct GPlasmaClientPrivate_ {
315   plasma::PlasmaClient *client;
316   bool disconnected;
317 } GPlasmaClientPrivate;
318 
319 enum {
320   PROP_CLIENT = 1
321 };
322 
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient,gplasma_client,G_TYPE_OBJECT)323 G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient,
324                            gplasma_client,
325                            G_TYPE_OBJECT)
326 
327 #define GPLASMA_CLIENT_GET_PRIVATE(object)         \
328   static_cast<GPlasmaClientPrivate *>(             \
329     gplasma_client_get_instance_private(           \
330       GPLASMA_CLIENT(object)))
331 
332 static void
333 gplasma_client_finalize(GObject *object)
334 {
335   auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
336 
337   if (!priv->disconnected) {
338     auto status = priv->client->Disconnect();
339     if (!status.ok()) {
340       g_warning("[plasma][client][finalize] Failed to disconnect: %s",
341                 status.ToString().c_str());
342     }
343   }
344   delete priv->client;
345 
346   G_OBJECT_CLASS(gplasma_client_parent_class)->finalize(object);
347 }
348 
349 static void
gplasma_client_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)350 gplasma_client_set_property(GObject *object,
351                             guint prop_id,
352                             const GValue *value,
353                             GParamSpec *pspec)
354 {
355   auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
356 
357   switch (prop_id) {
358   case PROP_CLIENT:
359     priv->client =
360       static_cast<plasma::PlasmaClient *>(g_value_get_pointer(value));
361     break;
362   default:
363     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
364     break;
365   }
366 }
367 
368 static void
gplasma_client_init(GPlasmaClient * object)369 gplasma_client_init(GPlasmaClient *object)
370 {
371 }
372 
373 static void
gplasma_client_class_init(GPlasmaClientClass * klass)374 gplasma_client_class_init(GPlasmaClientClass *klass)
375 {
376   GParamSpec *spec;
377 
378   auto gobject_class = G_OBJECT_CLASS(klass);
379 
380   gobject_class->finalize     = gplasma_client_finalize;
381   gobject_class->set_property = gplasma_client_set_property;
382 
383   spec = g_param_spec_pointer("client",
384                               "Client",
385                               "The raw plasma::PlasmaClient *",
386                               static_cast<GParamFlags>(G_PARAM_WRITABLE |
387                                                        G_PARAM_CONSTRUCT_ONLY));
388   g_object_class_install_property(gobject_class, PROP_CLIENT, spec);
389 }
390 
391 /**
392  * gplasma_client_new:
393  * @store_socket_name: The name of the UNIX domain socket.
394  * @options: (nullable): The options to custom how to connect to plasma store.
395  * @error: (nullable): Return location for a #GError or %NULL.
396  *
397  * Returns: (nullable): A newly created #GPlasmaClient on success,
398  *   %NULL on error.
399  *
400  * Since: 0.12.0
401  */
402 GPlasmaClient *
gplasma_client_new(const gchar * store_socket_name,GPlasmaClientOptions * options,GError ** error)403 gplasma_client_new(const gchar *store_socket_name,
404                    GPlasmaClientOptions *options,
405                    GError **error)
406 {
407   auto plasma_client = new plasma::PlasmaClient();
408   int n_retries = -1;
409   if (options) {
410     n_retries = gplasma_client_options_get_n_retries(options);
411   }
412   auto status = plasma_client->Connect(store_socket_name, "", 0, n_retries);
413   if (garrow_error_check(error, status, "[plasma][client][new]")) {
414     return gplasma_client_new_raw(plasma_client);
415   } else {
416     return NULL;
417   }
418 }
419 
420 /**
421  * gplasma_client_create:
422  * @client: A #GPlasmaClient.
423  * @id: The ID for a newly created object.
424  * @data_size: The number of bytes of data for a newly created object.
425  * @options: (nullable): The option for creating an object.
426  * @error: (nullable): Return location for a #GError or %NULL.
427  *
428  * Returns: (nullable) (transfer full): A newly created #GPlasmaCreatedObject
429  *   on success, %NULL on error.
430  *
431  * Since: 0.12.0
432  */
433 GPlasmaCreatedObject *
gplasma_client_create(GPlasmaClient * client,GPlasmaObjectID * id,gsize data_size,GPlasmaClientCreateOptions * options,GError ** error)434 gplasma_client_create(GPlasmaClient *client,
435                       GPlasmaObjectID *id,
436                       gsize data_size,
437                       GPlasmaClientCreateOptions *options,
438                       GError **error)
439 {
440   const auto context = "[plasma][client][create]";
441   auto plasma_client = gplasma_client_get_raw(client);
442   auto plasma_id = gplasma_object_id_get_raw(id);
443   const uint8_t *raw_metadata = nullptr;
444   int64_t raw_metadata_size = 0;
445   int device_number = 0;
446   if (options) {
447     auto options_priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
448     raw_metadata = options_priv->metadata;
449     raw_metadata_size = options_priv->metadata_size;
450     if (options_priv->gpu_device >= 0) {
451 #ifndef HAVE_ARROW_CUDA
452       g_set_error(error,
453                   GARROW_ERROR,
454                   GARROW_ERROR_INVALID,
455                   "%s Arrow CUDA GLib is needed to use GPU",
456                   context);
457       return NULL;
458 #endif
459       device_number = options_priv->gpu_device + 1;
460     }
461   }
462   std::shared_ptr<arrow::Buffer> plasma_data;
463   auto status = plasma_client->Create(plasma_id,
464                                       data_size,
465                                       raw_metadata,
466                                       raw_metadata_size,
467                                       &plasma_data,
468                                       device_number);
469   if (!garrow_error_check(error, status, context)) {
470     return NULL;
471   }
472 
473   GArrowBuffer *data = nullptr;
474   if (device_number == 0) {
475     auto plasma_mutable_data =
476       std::static_pointer_cast<arrow::MutableBuffer>(plasma_data);
477     data = GARROW_BUFFER(garrow_mutable_buffer_new_raw(&plasma_mutable_data));
478 #ifdef HAVE_ARROW_CUDA
479   } else {
480     auto plasma_cuda_data =
481       std::static_pointer_cast<arrow::cuda::CudaBuffer>(plasma_data);
482     data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data));
483 #endif
484   }
485   std::shared_ptr<arrow::Buffer> plasma_metadata;
486   GArrowBuffer *metadata = nullptr;
487   if (raw_metadata_size > 0) {
488     plasma_metadata =
489       std::make_shared<arrow::Buffer>(raw_metadata, raw_metadata_size);
490     metadata = garrow_buffer_new_raw(&plasma_metadata);
491   }
492   return gplasma_created_object_new_raw(client,
493                                         id,
494                                         &plasma_data,
495                                         data,
496                                         metadata ? &plasma_metadata : nullptr,
497                                         metadata,
498                                         device_number - 1);
499 }
500 
501 /**
502  * gplasma_client_refer_object:
503  * @client: A #GPlasmaClient.
504  * @id: The ID of the target object.
505  * @timeout_ms: The timeout in milliseconds. -1 means no timeout.
506  * @error: (nullable): Return location for a #GError or %NULL.
507  *
508  * Returns: (nullable) (transfer full): A found #GPlasmaReferredObject
509  *   on success, %NULL on error.
510  *
511  * Since: 0.12.0
512  */
513 GPlasmaReferredObject *
gplasma_client_refer_object(GPlasmaClient * client,GPlasmaObjectID * id,gint64 timeout_ms,GError ** error)514 gplasma_client_refer_object(GPlasmaClient *client,
515                             GPlasmaObjectID *id,
516                             gint64 timeout_ms,
517                             GError **error)
518 {
519   const auto context = "[plasma][client][refer-object]";
520   auto plasma_client = gplasma_client_get_raw(client);
521   auto plasma_id = gplasma_object_id_get_raw(id);
522   std::vector<plasma::ObjectID> plasma_ids;
523   plasma_ids.push_back(plasma_id);
524   std::vector<plasma::ObjectBuffer> plasma_object_buffers;
525   auto status = plasma_client->Get(plasma_ids,
526                                    timeout_ms,
527                                    &plasma_object_buffers);
528   if (!garrow_error_check(error, status, context)) {
529     return NULL;
530   }
531 
532   auto plasma_object_buffer = plasma_object_buffers[0];
533   auto plasma_data = plasma_object_buffer.data;
534   auto plasma_metadata = plasma_object_buffer.metadata;
535   GArrowBuffer *data = nullptr;
536   GArrowBuffer *metadata = nullptr;
537   if (plasma_object_buffer.device_num == 0) {
538     data = garrow_buffer_new_raw(&plasma_data);
539     metadata = garrow_buffer_new_raw(&plasma_metadata);
540   } else {
541 #ifdef HAVE_ARROW_CUDA
542     auto plasma_cuda_data = arrow::cuda::CudaBuffer::FromBuffer(plasma_data);
543     if (!garrow::check(error, plasma_cuda_data, context)) {
544       return NULL;
545     }
546     auto plasma_cuda_metadata =
547       arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata);
548     if (!garrow::check(error, plasma_cuda_metadata, context)) {
549       return NULL;
550     }
551 
552     data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_data)));
553     metadata =
554       GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_metadata)));
555 #else
556     g_set_error(error,
557                 GARROW_ERROR,
558                 GARROW_ERROR_INVALID,
559                 "%s Arrow CUDA GLib is needed to use GPU",
560                 context);
561     return NULL;
562 #endif
563   }
564   return gplasma_referred_object_new_raw(client,
565                                          id,
566                                          &plasma_data,
567                                          data,
568                                          &plasma_metadata,
569                                          metadata,
570                                          plasma_object_buffer.device_num - 1);
571 }
572 
573 /**
574  * gplasma_client_disconnect:
575  * @client: A #GPlasmaClient.
576  * @error: (nullable): Return location for a #GError or %NULL.
577  *
578  * Returns: %TRUE on success, %FALSE if there was an error.
579  *
580  * Since: 0.12.0
581  */
582 gboolean
gplasma_client_disconnect(GPlasmaClient * client,GError ** error)583 gplasma_client_disconnect(GPlasmaClient *client,
584                           GError **error)
585 {
586   auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
587   auto status = priv->client->Disconnect();
588   if (garrow_error_check(error, status, "[plasma][client][disconnect]")) {
589     priv->disconnected = true;
590     return TRUE;
591   } else {
592     return FALSE;
593   }
594 }
595 
596 G_END_DECLS
597 
598 GPlasmaClient *
gplasma_client_new_raw(plasma::PlasmaClient * plasma_client)599 gplasma_client_new_raw(plasma::PlasmaClient *plasma_client)
600 {
601   auto client = g_object_new(GPLASMA_TYPE_CLIENT,
602                              "client", plasma_client,
603                              NULL);
604   return GPLASMA_CLIENT(client);
605 }
606 
607 plasma::PlasmaClient *
gplasma_client_get_raw(GPlasmaClient * client)608 gplasma_client_get_raw(GPlasmaClient *client)
609 {
610   auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
611   return priv->client;
612 }
613