1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #ifdef HAVE_CONFIG_H
21 # include <config.h>
22 #endif
23
24 #include <arrow-glib/buffer.hpp>
25 #include <arrow-glib/error.hpp>
26
27 #ifdef HAVE_ARROW_CUDA
28 # include <arrow-cuda-glib/cuda.hpp>
29 #endif
30
31 #include <plasma-glib/client.hpp>
32 #include <plasma-glib/object.hpp>
33
34 G_BEGIN_DECLS
35
36 /**
37 * SECTION: client
38 * @section_id: client-classes
39 * @title: Client related classes
40 * @include: plasma-glib/plasma-glib.h
41 *
42 * #GPlasmaClientOptions is a class for customizing plasma store
43 * connection.
44 *
45 * #GPlasmaClientCreateOptions is a class for customizing object creation.
46 *
47 * #GPlasmaClient is a class for an interface with a plasma store.
48 *
49 * Since: 0.12.0
50 */
51
52 typedef struct GPlasmaClientCreatePrivate_ {
53 gint n_retries;
54 } GPlasmaClientOptionsPrivate;
55
56 enum {
57 PROP_N_RETRIES = 1
58 };
59
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions,gplasma_client_options,G_TYPE_OBJECT)60 G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions,
61 gplasma_client_options,
62 G_TYPE_OBJECT)
63
64 #define GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object) \
65 static_cast<GPlasmaClientOptionsPrivate *>( \
66 gplasma_client_options_get_instance_private( \
67 GPLASMA_CLIENT_OPTIONS(object)))
68
69 static void
70 gplasma_client_options_set_property(GObject *object,
71 guint prop_id,
72 const GValue *value,
73 GParamSpec *pspec)
74 {
75 auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
76
77 switch (prop_id) {
78 case PROP_N_RETRIES:
79 priv->n_retries = g_value_get_int(value);
80 break;
81 default:
82 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
83 break;
84 }
85 }
86
87 static void
gplasma_client_options_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)88 gplasma_client_options_get_property(GObject *object,
89 guint prop_id,
90 GValue *value,
91 GParamSpec *pspec)
92 {
93 auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
94
95 switch (prop_id) {
96 case PROP_N_RETRIES:
97 g_value_set_int(value, priv->n_retries);
98 break;
99 default:
100 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
101 break;
102 }
103 }
104
105 static void
gplasma_client_options_init(GPlasmaClientOptions * object)106 gplasma_client_options_init(GPlasmaClientOptions *object)
107 {
108 }
109
110 static void
gplasma_client_options_class_init(GPlasmaClientOptionsClass * klass)111 gplasma_client_options_class_init(GPlasmaClientOptionsClass *klass)
112 {
113 auto gobject_class = G_OBJECT_CLASS(klass);
114
115 gobject_class->set_property = gplasma_client_options_set_property;
116 gobject_class->get_property = gplasma_client_options_get_property;
117
118 GParamSpec *spec;
119 spec = g_param_spec_int("n-retries",
120 "N retries",
121 "The number of retries to connect plasma store. "
122 "-1 means that the system default value is used.",
123 -1,
124 G_MAXINT,
125 -1,
126 static_cast<GParamFlags>(G_PARAM_READWRITE |
127 G_PARAM_CONSTRUCT));
128 g_object_class_install_property(gobject_class, PROP_N_RETRIES, spec);
129 }
130
131 /**
132 * gplasma_client_options_new:
133 *
134 * Returns: A newly created #GPlasmaClientOptions.
135 *
136 * Since: 0.12.0
137 */
138 GPlasmaClientOptions *
gplasma_client_options_new(void)139 gplasma_client_options_new(void)
140 {
141 auto options = g_object_new(GPLASMA_TYPE_CLIENT_OPTIONS,
142 NULL);
143 return GPLASMA_CLIENT_OPTIONS(options);
144 }
145
146 /**
147 * gplasma_client_options_set_n_retries:
148 * @options: A #GPlasmaClientOptions.
149 * @n_retries: The number of retires on connect.
150 *
151 * Since: 0.12.0
152 */
153 void
gplasma_client_options_set_n_retries(GPlasmaClientOptions * options,gint n_retries)154 gplasma_client_options_set_n_retries(GPlasmaClientOptions *options,
155 gint n_retries)
156 {
157 auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
158 priv->n_retries = n_retries;
159 }
160
161 /**
162 * gplasma_client_options_get_n_retries:
163 * @options: A #GPlasmaClientOptions.
164 *
165 * Returns: The number of retries on connect.
166 *
167 * Since: 0.12.0
168 */
169 gint
gplasma_client_options_get_n_retries(GPlasmaClientOptions * options)170 gplasma_client_options_get_n_retries(GPlasmaClientOptions *options)
171 {
172 auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
173 return priv->n_retries;
174 }
175
176
177 typedef struct GPlasmaClientCreateOptionsPrivate_ {
178 guint8 *metadata;
179 gsize metadata_size;
180 gint gpu_device;
181 } GPlasmaClientCreateOptionsPrivate;
182
183 enum {
184 PROP_GPU_DEVICE = 1
185 };
186
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions,gplasma_client_create_options,G_TYPE_OBJECT)187 G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions,
188 gplasma_client_create_options,
189 G_TYPE_OBJECT)
190
191 #define GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object) \
192 static_cast<GPlasmaClientCreateOptionsPrivate *>( \
193 gplasma_client_create_options_get_instance_private( \
194 GPLASMA_CLIENT_CREATE_OPTIONS(object)))
195
196 static void
197 gplasma_client_create_options_set_property(GObject *object,
198 guint prop_id,
199 const GValue *value,
200 GParamSpec *pspec)
201 {
202 auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
203
204 switch (prop_id) {
205 case PROP_GPU_DEVICE:
206 priv->gpu_device = g_value_get_int(value);
207 break;
208 default:
209 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
210 break;
211 }
212 }
213
214 static void
gplasma_client_create_options_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)215 gplasma_client_create_options_get_property(GObject *object,
216 guint prop_id,
217 GValue *value,
218 GParamSpec *pspec)
219 {
220 auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
221
222 switch (prop_id) {
223 case PROP_GPU_DEVICE:
224 g_value_set_int(value, priv->gpu_device);
225 break;
226 default:
227 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
228 break;
229 }
230 }
231
232 static void
gplasma_client_create_options_init(GPlasmaClientCreateOptions * object)233 gplasma_client_create_options_init(GPlasmaClientCreateOptions *object)
234 {
235 }
236
237 static void
gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass * klass)238 gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass *klass)
239 {
240 auto gobject_class = G_OBJECT_CLASS(klass);
241
242 gobject_class->set_property = gplasma_client_create_options_set_property;
243 gobject_class->get_property = gplasma_client_create_options_get_property;
244
245 GParamSpec *spec;
246 spec = g_param_spec_int("gpu-device",
247 "GPU device",
248 "The GPU device number. -1 means GPU isn't used.",
249 -1,
250 G_MAXINT,
251 -1,
252 static_cast<GParamFlags>(G_PARAM_READWRITE |
253 G_PARAM_CONSTRUCT));
254 g_object_class_install_property(gobject_class, PROP_GPU_DEVICE, spec);
255 }
256
257 /**
258 * gplasma_client_create_options_new:
259 *
260 * Returns: A newly created #GPlasmaClientCreateOptions.
261 *
262 * Since: 0.12.0
263 */
264 GPlasmaClientCreateOptions *
gplasma_client_create_options_new(void)265 gplasma_client_create_options_new(void)
266 {
267 auto options = g_object_new(GPLASMA_TYPE_CLIENT_CREATE_OPTIONS,
268 NULL);
269 return GPLASMA_CLIENT_CREATE_OPTIONS(options);
270 }
271
272 /**
273 * gplasma_client_create_options_set_metadata:
274 * @options: A #GPlasmaClientCreateOptions.
275 * @metadata: (nullable) (array length=size): The metadata of a created object.
276 * @size: The number of bytes of the metadata.
277 *
278 * Since: 0.12.0
279 */
280 void
gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions * options,const guint8 * metadata,gsize size)281 gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions *options,
282 const guint8 *metadata,
283 gsize size)
284 {
285 auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
286 if (priv->metadata) {
287 g_free(priv->metadata);
288 }
289 priv->metadata = static_cast<guint8 *>(g_memdup(metadata, size));
290 priv->metadata_size = size;
291 }
292
293 /**
294 * gplasma_client_create_options_get_metadata:
295 * @options: A #GPlasmaClientCreateOptions.
296 * @size: (nullable) (out): The number of bytes of the metadata.
297 *
298 * Returns: (nullable) (array length=size): The metadata of a created object.
299 *
300 * Since: 0.12.0
301 */
302 const guint8 *
gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions * options,gsize * size)303 gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions *options,
304 gsize *size)
305 {
306 auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
307 if (size) {
308 *size = priv->metadata_size;
309 }
310 return priv->metadata;
311 }
312
313
314 typedef struct GPlasmaClientPrivate_ {
315 plasma::PlasmaClient *client;
316 bool disconnected;
317 } GPlasmaClientPrivate;
318
319 enum {
320 PROP_CLIENT = 1
321 };
322
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient,gplasma_client,G_TYPE_OBJECT)323 G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient,
324 gplasma_client,
325 G_TYPE_OBJECT)
326
327 #define GPLASMA_CLIENT_GET_PRIVATE(object) \
328 static_cast<GPlasmaClientPrivate *>( \
329 gplasma_client_get_instance_private( \
330 GPLASMA_CLIENT(object)))
331
332 static void
333 gplasma_client_finalize(GObject *object)
334 {
335 auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
336
337 if (!priv->disconnected) {
338 auto status = priv->client->Disconnect();
339 if (!status.ok()) {
340 g_warning("[plasma][client][finalize] Failed to disconnect: %s",
341 status.ToString().c_str());
342 }
343 }
344 delete priv->client;
345
346 G_OBJECT_CLASS(gplasma_client_parent_class)->finalize(object);
347 }
348
349 static void
gplasma_client_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)350 gplasma_client_set_property(GObject *object,
351 guint prop_id,
352 const GValue *value,
353 GParamSpec *pspec)
354 {
355 auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
356
357 switch (prop_id) {
358 case PROP_CLIENT:
359 priv->client =
360 static_cast<plasma::PlasmaClient *>(g_value_get_pointer(value));
361 break;
362 default:
363 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
364 break;
365 }
366 }
367
368 static void
gplasma_client_init(GPlasmaClient * object)369 gplasma_client_init(GPlasmaClient *object)
370 {
371 }
372
373 static void
gplasma_client_class_init(GPlasmaClientClass * klass)374 gplasma_client_class_init(GPlasmaClientClass *klass)
375 {
376 GParamSpec *spec;
377
378 auto gobject_class = G_OBJECT_CLASS(klass);
379
380 gobject_class->finalize = gplasma_client_finalize;
381 gobject_class->set_property = gplasma_client_set_property;
382
383 spec = g_param_spec_pointer("client",
384 "Client",
385 "The raw plasma::PlasmaClient *",
386 static_cast<GParamFlags>(G_PARAM_WRITABLE |
387 G_PARAM_CONSTRUCT_ONLY));
388 g_object_class_install_property(gobject_class, PROP_CLIENT, spec);
389 }
390
391 /**
392 * gplasma_client_new:
393 * @store_socket_name: The name of the UNIX domain socket.
394 * @options: (nullable): The options to custom how to connect to plasma store.
395 * @error: (nullable): Return location for a #GError or %NULL.
396 *
397 * Returns: (nullable): A newly created #GPlasmaClient on success,
398 * %NULL on error.
399 *
400 * Since: 0.12.0
401 */
402 GPlasmaClient *
gplasma_client_new(const gchar * store_socket_name,GPlasmaClientOptions * options,GError ** error)403 gplasma_client_new(const gchar *store_socket_name,
404 GPlasmaClientOptions *options,
405 GError **error)
406 {
407 auto plasma_client = new plasma::PlasmaClient();
408 int n_retries = -1;
409 if (options) {
410 n_retries = gplasma_client_options_get_n_retries(options);
411 }
412 auto status = plasma_client->Connect(store_socket_name, "", 0, n_retries);
413 if (garrow_error_check(error, status, "[plasma][client][new]")) {
414 return gplasma_client_new_raw(plasma_client);
415 } else {
416 return NULL;
417 }
418 }
419
420 /**
421 * gplasma_client_create:
422 * @client: A #GPlasmaClient.
423 * @id: The ID for a newly created object.
424 * @data_size: The number of bytes of data for a newly created object.
425 * @options: (nullable): The option for creating an object.
426 * @error: (nullable): Return location for a #GError or %NULL.
427 *
428 * Returns: (nullable) (transfer full): A newly created #GPlasmaCreatedObject
429 * on success, %NULL on error.
430 *
431 * Since: 0.12.0
432 */
433 GPlasmaCreatedObject *
gplasma_client_create(GPlasmaClient * client,GPlasmaObjectID * id,gsize data_size,GPlasmaClientCreateOptions * options,GError ** error)434 gplasma_client_create(GPlasmaClient *client,
435 GPlasmaObjectID *id,
436 gsize data_size,
437 GPlasmaClientCreateOptions *options,
438 GError **error)
439 {
440 const auto context = "[plasma][client][create]";
441 auto plasma_client = gplasma_client_get_raw(client);
442 auto plasma_id = gplasma_object_id_get_raw(id);
443 const uint8_t *raw_metadata = nullptr;
444 int64_t raw_metadata_size = 0;
445 int device_number = 0;
446 if (options) {
447 auto options_priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
448 raw_metadata = options_priv->metadata;
449 raw_metadata_size = options_priv->metadata_size;
450 if (options_priv->gpu_device >= 0) {
451 #ifndef HAVE_ARROW_CUDA
452 g_set_error(error,
453 GARROW_ERROR,
454 GARROW_ERROR_INVALID,
455 "%s Arrow CUDA GLib is needed to use GPU",
456 context);
457 return NULL;
458 #endif
459 device_number = options_priv->gpu_device + 1;
460 }
461 }
462 std::shared_ptr<arrow::Buffer> plasma_data;
463 auto status = plasma_client->Create(plasma_id,
464 data_size,
465 raw_metadata,
466 raw_metadata_size,
467 &plasma_data,
468 device_number);
469 if (!garrow_error_check(error, status, context)) {
470 return NULL;
471 }
472
473 GArrowBuffer *data = nullptr;
474 if (device_number == 0) {
475 auto plasma_mutable_data =
476 std::static_pointer_cast<arrow::MutableBuffer>(plasma_data);
477 data = GARROW_BUFFER(garrow_mutable_buffer_new_raw(&plasma_mutable_data));
478 #ifdef HAVE_ARROW_CUDA
479 } else {
480 auto plasma_cuda_data =
481 std::static_pointer_cast<arrow::cuda::CudaBuffer>(plasma_data);
482 data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data));
483 #endif
484 }
485 std::shared_ptr<arrow::Buffer> plasma_metadata;
486 GArrowBuffer *metadata = nullptr;
487 if (raw_metadata_size > 0) {
488 plasma_metadata =
489 std::make_shared<arrow::Buffer>(raw_metadata, raw_metadata_size);
490 metadata = garrow_buffer_new_raw(&plasma_metadata);
491 }
492 return gplasma_created_object_new_raw(client,
493 id,
494 &plasma_data,
495 data,
496 metadata ? &plasma_metadata : nullptr,
497 metadata,
498 device_number - 1);
499 }
500
501 /**
502 * gplasma_client_refer_object:
503 * @client: A #GPlasmaClient.
504 * @id: The ID of the target object.
505 * @timeout_ms: The timeout in milliseconds. -1 means no timeout.
506 * @error: (nullable): Return location for a #GError or %NULL.
507 *
508 * Returns: (nullable) (transfer full): A found #GPlasmaReferredObject
509 * on success, %NULL on error.
510 *
511 * Since: 0.12.0
512 */
513 GPlasmaReferredObject *
gplasma_client_refer_object(GPlasmaClient * client,GPlasmaObjectID * id,gint64 timeout_ms,GError ** error)514 gplasma_client_refer_object(GPlasmaClient *client,
515 GPlasmaObjectID *id,
516 gint64 timeout_ms,
517 GError **error)
518 {
519 const auto context = "[plasma][client][refer-object]";
520 auto plasma_client = gplasma_client_get_raw(client);
521 auto plasma_id = gplasma_object_id_get_raw(id);
522 std::vector<plasma::ObjectID> plasma_ids;
523 plasma_ids.push_back(plasma_id);
524 std::vector<plasma::ObjectBuffer> plasma_object_buffers;
525 auto status = plasma_client->Get(plasma_ids,
526 timeout_ms,
527 &plasma_object_buffers);
528 if (!garrow_error_check(error, status, context)) {
529 return NULL;
530 }
531
532 auto plasma_object_buffer = plasma_object_buffers[0];
533 auto plasma_data = plasma_object_buffer.data;
534 auto plasma_metadata = plasma_object_buffer.metadata;
535 GArrowBuffer *data = nullptr;
536 GArrowBuffer *metadata = nullptr;
537 if (plasma_object_buffer.device_num == 0) {
538 data = garrow_buffer_new_raw(&plasma_data);
539 metadata = garrow_buffer_new_raw(&plasma_metadata);
540 } else {
541 #ifdef HAVE_ARROW_CUDA
542 auto plasma_cuda_data = arrow::cuda::CudaBuffer::FromBuffer(plasma_data);
543 if (!garrow::check(error, plasma_cuda_data, context)) {
544 return NULL;
545 }
546 auto plasma_cuda_metadata =
547 arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata);
548 if (!garrow::check(error, plasma_cuda_metadata, context)) {
549 return NULL;
550 }
551
552 data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_data)));
553 metadata =
554 GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_metadata)));
555 #else
556 g_set_error(error,
557 GARROW_ERROR,
558 GARROW_ERROR_INVALID,
559 "%s Arrow CUDA GLib is needed to use GPU",
560 context);
561 return NULL;
562 #endif
563 }
564 return gplasma_referred_object_new_raw(client,
565 id,
566 &plasma_data,
567 data,
568 &plasma_metadata,
569 metadata,
570 plasma_object_buffer.device_num - 1);
571 }
572
573 /**
574 * gplasma_client_disconnect:
575 * @client: A #GPlasmaClient.
576 * @error: (nullable): Return location for a #GError or %NULL.
577 *
578 * Returns: %TRUE on success, %FALSE if there was an error.
579 *
580 * Since: 0.12.0
581 */
582 gboolean
gplasma_client_disconnect(GPlasmaClient * client,GError ** error)583 gplasma_client_disconnect(GPlasmaClient *client,
584 GError **error)
585 {
586 auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
587 auto status = priv->client->Disconnect();
588 if (garrow_error_check(error, status, "[plasma][client][disconnect]")) {
589 priv->disconnected = true;
590 return TRUE;
591 } else {
592 return FALSE;
593 }
594 }
595
596 G_END_DECLS
597
598 GPlasmaClient *
gplasma_client_new_raw(plasma::PlasmaClient * plasma_client)599 gplasma_client_new_raw(plasma::PlasmaClient *plasma_client)
600 {
601 auto client = g_object_new(GPLASMA_TYPE_CLIENT,
602 "client", plasma_client,
603 NULL);
604 return GPLASMA_CLIENT(client);
605 }
606
607 plasma::PlasmaClient *
gplasma_client_get_raw(GPlasmaClient * client)608 gplasma_client_get_raw(GPlasmaClient *client)
609 {
610 auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
611 return priv->client;
612 }
613