1 /*
2 * vi: set autoindent tabstop=4 shiftwidth=4 :
3 *
4 * Copyright (c) 2006-2015 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Christine Caulfield (ccaulfi@redhat.com)
9 * Author: Jan Friesse (jfriesse@redhat.com)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37 /*
38 * Provides a closed process group API using the coroipcc executive
39 */
40
41 #include <config.h>
42
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #include <sys/mman.h>
50 #include <sys/uio.h>
51 #include <sys/stat.h>
52 #include <errno.h>
53 #include <limits.h>
54
55 #include <qb/qbdefs.h>
56 #include <qb/qbipcc.h>
57 #include <qb/qblog.h>
58
59 #include <corosync/hdb.h>
60 #include <corosync/list.h>
61 #include <corosync/corotypes.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/cpg.h>
64 #include <corosync/ipc_cpg.h>
65
66 #include "util.h"
67
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71
72 /*
73 * Maximum number of times to retry a send when transmitting
74 * a large message fragment
75 */
76 #define MAX_RETRIES 100
77
78 /*
79 * ZCB files have following umask (umask is same as used in libqb)
80 */
81 #define CPG_MEMORY_MAP_UMASK 077
82
83 struct cpg_assembly_data
84 {
85 struct list_head list;
86 uint32_t nodeid;
87 uint32_t pid;
88 char *assembly_buf;
89 uint32_t assembly_buf_ptr;
90 };
91
92 struct cpg_inst {
93 qb_ipcc_connection_t *c;
94 int finalize;
95 void *context;
96 union {
97 cpg_model_data_t model_data;
98 cpg_model_v1_data_t model_v1_data;
99 };
100 struct list_head iteration_list_head;
101 uint32_t max_msg_size;
102 struct list_head assembly_list_head;
103 };
104 static void cpg_inst_free (void *inst);
105
106 DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107
108 struct cpg_iteration_instance_t {
109 cpg_iteration_handle_t cpg_iteration_handle;
110 qb_ipcc_connection_t *conn;
111 hdb_handle_t executive_iteration_handle;
112 struct list_head list;
113 };
114
115 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116
117
118 /*
119 * Internal (not visible by API) functions
120 */
121
122 static cs_error_t
coroipcc_msg_send_reply_receive(qb_ipcc_connection_t * c,const struct iovec * iov,unsigned int iov_len,void * res_msg,size_t res_len)123 coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
127 void *res_msg,
128 size_t res_len)
129 {
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
131 CS_IPC_TIMEOUT_MS));
132 }
133
cpg_iteration_instance_finalize(struct cpg_iteration_instance_t * cpg_iteration_instance)134 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135 {
136 list_del (&cpg_iteration_instance->list);
137 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138 }
139
cpg_inst_free(void * inst)140 static void cpg_inst_free (void *inst)
141 {
142 struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143 qb_ipcc_disconnect(cpg_inst->c);
144 }
145
cpg_inst_finalize(struct cpg_inst * cpg_inst,hdb_handle_t handle)146 static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147 {
148 struct list_head *iter, *iter_next;
149 struct cpg_iteration_instance_t *cpg_iteration_instance;
150
151 /*
152 * Traverse thru iteration instances and delete them
153 */
154 for (iter = cpg_inst->iteration_list_head.next; iter != &cpg_inst->iteration_list_head;iter = iter_next) {
155 iter_next = iter->next;
156
157 cpg_iteration_instance = list_entry (iter, struct cpg_iteration_instance_t, list);
158
159 cpg_iteration_instance_finalize (cpg_iteration_instance);
160 }
161 hdb_handle_destroy (&cpg_handle_t_db, handle);
162 }
163
164 /**
165 * @defgroup cpg_coroipcc The closed process group API
166 * @ingroup coroipcc
167 *
168 * @{
169 */
170
cpg_initialize(cpg_handle_t * handle,cpg_callbacks_t * callbacks)171 cs_error_t cpg_initialize (
172 cpg_handle_t *handle,
173 cpg_callbacks_t *callbacks)
174 {
175 cpg_model_v1_data_t model_v1_data;
176
177 memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
178
179 if (callbacks) {
180 model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
181 model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
182 }
183
184 return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
185 }
186
cpg_model_initialize(cpg_handle_t * handle,cpg_model_t model,cpg_model_data_t * model_data,void * context)187 cs_error_t cpg_model_initialize (
188 cpg_handle_t *handle,
189 cpg_model_t model,
190 cpg_model_data_t *model_data,
191 void *context)
192 {
193 cs_error_t error;
194 struct cpg_inst *cpg_inst;
195
196 if (model != CPG_MODEL_V1) {
197 error = CS_ERR_INVALID_PARAM;
198 goto error_no_destroy;
199 }
200
201 error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
202 if (error != CS_OK) {
203 goto error_no_destroy;
204 }
205
206 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
207 if (error != CS_OK) {
208 goto error_destroy;
209 }
210
211 cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
212 if (cpg_inst->c == NULL) {
213 error = qb_to_cs_error(-errno);
214 goto error_put_destroy;
215 }
216
217 if (model_data != NULL) {
218 switch (model) {
219 case CPG_MODEL_V1:
220 memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
221 if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
222 error = CS_ERR_INVALID_PARAM;
223
224 goto error_destroy;
225 }
226 break;
227 }
228 }
229
230 /* Allow space for corosync internal headers */
231 cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
232 cpg_inst->model_data.model = model;
233 cpg_inst->context = context;
234
235 list_init(&cpg_inst->iteration_list_head);
236
237 list_init(&cpg_inst->assembly_list_head);
238
239 hdb_handle_put (&cpg_handle_t_db, *handle);
240
241 return (CS_OK);
242
243 error_put_destroy:
244 hdb_handle_put (&cpg_handle_t_db, *handle);
245 error_destroy:
246 hdb_handle_destroy (&cpg_handle_t_db, *handle);
247 error_no_destroy:
248 return (error);
249 }
250
cpg_finalize(cpg_handle_t handle)251 cs_error_t cpg_finalize (
252 cpg_handle_t handle)
253 {
254 struct cpg_inst *cpg_inst;
255 struct iovec iov;
256 struct req_lib_cpg_finalize req_lib_cpg_finalize;
257 struct res_lib_cpg_finalize res_lib_cpg_finalize;
258 cs_error_t error;
259
260 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
261 if (error != CS_OK) {
262 return (error);
263 }
264
265 /*
266 * Another thread has already started finalizing
267 */
268 if (cpg_inst->finalize) {
269 hdb_handle_put (&cpg_handle_t_db, handle);
270 return (CS_ERR_BAD_HANDLE);
271 }
272
273 cpg_inst->finalize = 1;
274
275 /*
276 * Send service request
277 */
278 req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
279 req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
280
281 iov.iov_base = (void *)&req_lib_cpg_finalize;
282 iov.iov_len = sizeof (struct req_lib_cpg_finalize);
283
284 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
285 &iov,
286 1,
287 &res_lib_cpg_finalize,
288 sizeof (struct res_lib_cpg_finalize));
289
290 cpg_inst_finalize (cpg_inst, handle);
291 hdb_handle_put (&cpg_handle_t_db, handle);
292
293 return (error);
294 }
295
cpg_fd_get(cpg_handle_t handle,int * fd)296 cs_error_t cpg_fd_get (
297 cpg_handle_t handle,
298 int *fd)
299 {
300 cs_error_t error;
301 struct cpg_inst *cpg_inst;
302
303 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
304 if (error != CS_OK) {
305 return (error);
306 }
307
308 error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
309
310 hdb_handle_put (&cpg_handle_t_db, handle);
311
312 return (error);
313 }
314
cpg_max_atomic_msgsize_get(cpg_handle_t handle,uint32_t * size)315 cs_error_t cpg_max_atomic_msgsize_get (
316 cpg_handle_t handle,
317 uint32_t *size)
318 {
319 cs_error_t error;
320 struct cpg_inst *cpg_inst;
321
322 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
323 if (error != CS_OK) {
324 return (error);
325 }
326
327 *size = cpg_inst->max_msg_size;
328
329 hdb_handle_put (&cpg_handle_t_db, handle);
330
331 return (error);
332 }
333
cpg_context_get(cpg_handle_t handle,void ** context)334 cs_error_t cpg_context_get (
335 cpg_handle_t handle,
336 void **context)
337 {
338 cs_error_t error;
339 struct cpg_inst *cpg_inst;
340
341 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
342 if (error != CS_OK) {
343 return (error);
344 }
345
346 *context = cpg_inst->context;
347
348 hdb_handle_put (&cpg_handle_t_db, handle);
349
350 return (CS_OK);
351 }
352
cpg_context_set(cpg_handle_t handle,void * context)353 cs_error_t cpg_context_set (
354 cpg_handle_t handle,
355 void *context)
356 {
357 cs_error_t error;
358 struct cpg_inst *cpg_inst;
359
360 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
361 if (error != CS_OK) {
362 return (error);
363 }
364
365 cpg_inst->context = context;
366
367 hdb_handle_put (&cpg_handle_t_db, handle);
368
369 return (CS_OK);
370 }
371
cpg_dispatch(cpg_handle_t handle,cs_dispatch_flags_t dispatch_types)372 cs_error_t cpg_dispatch (
373 cpg_handle_t handle,
374 cs_dispatch_flags_t dispatch_types)
375 {
376 int timeout = -1;
377 cs_error_t error;
378 int cont = 1; /* always continue do loop except when set to 0 */
379 struct cpg_inst *cpg_inst;
380 struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
381 struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
382 struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
383 struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
384 struct cpg_inst cpg_inst_copy;
385 struct qb_ipc_response_header *dispatch_data;
386 struct cpg_address member_list[CPG_MEMBERS_MAX];
387 struct cpg_address left_list[CPG_MEMBERS_MAX];
388 struct cpg_address joined_list[CPG_MEMBERS_MAX];
389 struct cpg_name group_name;
390 struct cpg_assembly_data *assembly_data;
391 struct list_head *iter, *tmp_iter;
392 mar_cpg_address_t *left_list_start;
393 mar_cpg_address_t *joined_list_start;
394 unsigned int i;
395 struct cpg_ring_id ring_id;
396 uint32_t totem_member_list[CPG_MEMBERS_MAX];
397 int32_t errno_res;
398 char dispatch_buf[IPC_DISPATCH_SIZE];
399
400 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
401 if (error != CS_OK) {
402 return (error);
403 }
404
405 /*
406 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
407 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
408 */
409 if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
410 timeout = 0;
411 }
412
413 dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
414 do {
415 errno_res = qb_ipcc_event_recv (
416 cpg_inst->c,
417 dispatch_buf,
418 IPC_DISPATCH_SIZE,
419 timeout);
420 error = qb_to_cs_error (errno_res);
421 if (error == CS_ERR_BAD_HANDLE) {
422 error = CS_OK;
423 goto error_put;
424 }
425 if (error == CS_ERR_TRY_AGAIN) {
426 if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
427 /*
428 * Don't mask error
429 */
430 goto error_put;
431 }
432 error = CS_OK;
433 if (dispatch_types == CS_DISPATCH_ALL) {
434 break; /* exit do while cont is 1 loop */
435 } else {
436 continue; /* next poll */
437 }
438 }
439 if (error != CS_OK) {
440 goto error_put;
441 }
442
443 /*
444 * Make copy of callbacks, message data, unlock instance, and call callback
445 * A risk of this dispatch method is that the callback routines may
446 * operate at the same time that cpgFinalize has been called.
447 */
448 memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
449 switch (cpg_inst_copy.model_data.model) {
450 case CPG_MODEL_V1:
451 /*
452 * Dispatch incoming message
453 */
454 switch (dispatch_data->id) {
455 case MESSAGE_RES_CPG_DELIVER_CALLBACK:
456 if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
457 break;
458 }
459
460 res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
461
462 marshall_from_mar_cpg_name_t (
463 &group_name,
464 &res_cpg_deliver_callback->group_name);
465
466 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
467 &group_name,
468 res_cpg_deliver_callback->nodeid,
469 res_cpg_deliver_callback->pid,
470 &res_cpg_deliver_callback->message,
471 res_cpg_deliver_callback->msglen);
472 break;
473
474 case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
475 res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
476
477 marshall_from_mar_cpg_name_t (
478 &group_name,
479 &res_cpg_partial_deliver_callback->group_name);
480
481 /*
482 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
483 */
484 assembly_data = NULL;
485 for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head; iter = iter->next) {
486 struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
487 if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
488 assembly_data = current_assembly_data;
489 break;
490 }
491 }
492
493 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
494
495 /*
496 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
497 * Otherwise the sending of packet must have been interrupted and error should have
498 * been reported to sending client. Therefore here last assembly will be dropped.
499 */
500 if (assembly_data) {
501 list_del (&assembly_data->list);
502 free(assembly_data->assembly_buf);
503 free(assembly_data);
504 assembly_data = NULL;
505 }
506
507 assembly_data = malloc(sizeof(struct cpg_assembly_data));
508 if (!assembly_data) {
509 error = CS_ERR_NO_MEMORY;
510 goto error_put;
511 }
512
513 assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
514 assembly_data->pid = res_cpg_partial_deliver_callback->pid;
515 assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
516 if (!assembly_data->assembly_buf) {
517 free(assembly_data);
518 error = CS_ERR_NO_MEMORY;
519 goto error_put;
520 }
521 assembly_data->assembly_buf_ptr = 0;
522 list_init (&assembly_data->list);
523
524 list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
525 }
526 if (assembly_data) {
527 memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
528 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
529 assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
530
531 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
532 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533 &group_name,
534 res_cpg_partial_deliver_callback->nodeid,
535 res_cpg_partial_deliver_callback->pid,
536 assembly_data->assembly_buf,
537 res_cpg_partial_deliver_callback->msglen);
538
539 list_del (&assembly_data->list);
540 free(assembly_data->assembly_buf);
541 free(assembly_data);
542 }
543 }
544 break;
545
546 case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
547 if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
548 break;
549 }
550
551 res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
552
553 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
554 marshall_from_mar_cpg_address_t (&member_list[i],
555 &res_cpg_confchg_callback->member_list[i]);
556 }
557 left_list_start = res_cpg_confchg_callback->member_list +
558 res_cpg_confchg_callback->member_list_entries;
559 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
560 marshall_from_mar_cpg_address_t (&left_list[i],
561 &left_list_start[i]);
562 }
563 joined_list_start = res_cpg_confchg_callback->member_list +
564 res_cpg_confchg_callback->member_list_entries +
565 res_cpg_confchg_callback->left_list_entries;
566 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
567 marshall_from_mar_cpg_address_t (&joined_list[i],
568 &joined_list_start[i]);
569 }
570 marshall_from_mar_cpg_name_t (
571 &group_name,
572 &res_cpg_confchg_callback->group_name);
573
574 cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
575 &group_name,
576 member_list,
577 res_cpg_confchg_callback->member_list_entries,
578 left_list,
579 res_cpg_confchg_callback->left_list_entries,
580 joined_list,
581 res_cpg_confchg_callback->joined_list_entries);
582
583 /*
584 * If member left while his partial packet was being assembled, assembly data must be removed from list
585 */
586 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
587 for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head;iter = tmp_iter) {
588 struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
589
590 tmp_iter = iter->next;
591
592 if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
593 continue;
594
595 list_del (¤t_assembly_data->list);
596 free(current_assembly_data->assembly_buf);
597 free(current_assembly_data);
598 }
599 }
600
601 break;
602 case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
603 if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
604 break;
605 }
606
607 res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
608
609 marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
610 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
611 totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
612 }
613
614 cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
615 ring_id,
616 res_cpg_totem_confchg_callback->member_list_entries,
617 totem_member_list);
618 break;
619 default:
620 error = CS_ERR_LIBRARY;
621 goto error_put;
622 break;
623 } /* - switch (dispatch_data->id) */
624 break; /* case CPG_MODEL_V1 */
625 } /* - switch (cpg_inst_copy.model_data.model) */
626
627 if (cpg_inst_copy.finalize || cpg_inst->finalize) {
628 /*
629 * If the finalize has been called then get out of the dispatch.
630 */
631 cpg_inst->finalize = 1;
632 error = CS_ERR_BAD_HANDLE;
633 goto error_put;
634 }
635
636 /*
637 * Determine if more messages should be processed
638 */
639 if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
640 cont = 0;
641 }
642 } while (cont);
643
644 error_put:
645 hdb_handle_put (&cpg_handle_t_db, handle);
646 return (error);
647 }
648
cpg_join(cpg_handle_t handle,const struct cpg_name * group)649 cs_error_t cpg_join (
650 cpg_handle_t handle,
651 const struct cpg_name *group)
652 {
653 cs_error_t error;
654 struct cpg_inst *cpg_inst;
655 struct iovec iov[2];
656 struct req_lib_cpg_join req_lib_cpg_join;
657 struct res_lib_cpg_join response;
658
659 if (group->length > CPG_MAX_NAME_LENGTH) {
660 return (CS_ERR_NAME_TOO_LONG);
661 }
662
663 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
664 if (error != CS_OK) {
665 return (error);
666 }
667
668 /* Now join */
669 req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
670 req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
671 req_lib_cpg_join.pid = getpid();
672 req_lib_cpg_join.flags = 0;
673
674 switch (cpg_inst->model_data.model) {
675 case CPG_MODEL_V1:
676 req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
677 break;
678 }
679
680 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
681 group);
682
683 iov[0].iov_base = (void *)&req_lib_cpg_join;
684 iov[0].iov_len = sizeof (struct req_lib_cpg_join);
685
686 do {
687 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
688 &response, sizeof (struct res_lib_cpg_join));
689
690 if (error != CS_OK) {
691 goto error_exit;
692 }
693 } while (response.header.error == CS_ERR_BUSY);
694
695 error = response.header.error;
696
697 error_exit:
698 hdb_handle_put (&cpg_handle_t_db, handle);
699
700 return (error);
701 }
702
cpg_leave(cpg_handle_t handle,const struct cpg_name * group)703 cs_error_t cpg_leave (
704 cpg_handle_t handle,
705 const struct cpg_name *group)
706 {
707 cs_error_t error;
708 struct cpg_inst *cpg_inst;
709 struct iovec iov[2];
710 struct req_lib_cpg_leave req_lib_cpg_leave;
711 struct res_lib_cpg_leave res_lib_cpg_leave;
712
713 if (group->length > CPG_MAX_NAME_LENGTH) {
714 return (CS_ERR_NAME_TOO_LONG);
715 }
716
717 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
718 if (error != CS_OK) {
719 return (error);
720 }
721
722 req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
723 req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
724 req_lib_cpg_leave.pid = getpid();
725 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
726 group);
727
728 iov[0].iov_base = (void *)&req_lib_cpg_leave;
729 iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
730
731 do {
732 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
733 &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
734
735 if (error != CS_OK) {
736 goto error_exit;
737 }
738 } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
739
740 error = res_lib_cpg_leave.header.error;
741
742 error_exit:
743 hdb_handle_put (&cpg_handle_t_db, handle);
744
745 return (error);
746 }
747
cpg_membership_get(cpg_handle_t handle,struct cpg_name * group_name,struct cpg_address * member_list,int * member_list_entries)748 cs_error_t cpg_membership_get (
749 cpg_handle_t handle,
750 struct cpg_name *group_name,
751 struct cpg_address *member_list,
752 int *member_list_entries)
753 {
754 cs_error_t error;
755 struct cpg_inst *cpg_inst;
756 struct iovec iov;
757 struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
758 struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
759 unsigned int i;
760
761 if (group_name->length > CPG_MAX_NAME_LENGTH) {
762 return (CS_ERR_NAME_TOO_LONG);
763 }
764 if (member_list == NULL) {
765 return (CS_ERR_INVALID_PARAM);
766 }
767 if (member_list_entries == NULL) {
768 return (CS_ERR_INVALID_PARAM);
769 }
770
771 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
772 if (error != CS_OK) {
773 return (error);
774 }
775
776 req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
777 req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
778
779 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
780 group_name);
781
782 iov.iov_base = (void *)&req_lib_cpg_membership_get;
783 iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
784
785 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
786 &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
787
788 if (error != CS_OK) {
789 goto error_exit;
790 }
791
792 error = res_lib_cpg_membership_get.header.error;
793
794 /*
795 * Copy results to caller
796 */
797 *member_list_entries = res_lib_cpg_membership_get.member_count;
798 if (member_list) {
799 for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
800 marshall_from_mar_cpg_address_t (&member_list[i],
801 &res_lib_cpg_membership_get.member_list[i]);
802 }
803 }
804
805 error_exit:
806 hdb_handle_put (&cpg_handle_t_db, handle);
807
808 return (error);
809 }
810
cpg_local_get(cpg_handle_t handle,unsigned int * local_nodeid)811 cs_error_t cpg_local_get (
812 cpg_handle_t handle,
813 unsigned int *local_nodeid)
814 {
815 cs_error_t error;
816 struct cpg_inst *cpg_inst;
817 struct iovec iov;
818 struct req_lib_cpg_local_get req_lib_cpg_local_get;
819 struct res_lib_cpg_local_get res_lib_cpg_local_get;
820
821 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
822 if (error != CS_OK) {
823 return (error);
824 }
825
826 req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
827 req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
828
829 iov.iov_base = (void *)&req_lib_cpg_local_get;
830 iov.iov_len = sizeof (struct req_lib_cpg_local_get);
831
832 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
833 &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
834
835 if (error != CS_OK) {
836 goto error_exit;
837 }
838
839 error = res_lib_cpg_local_get.header.error;
840
841 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
842
843 error_exit:
844 hdb_handle_put (&cpg_handle_t_db, handle);
845
846 return (error);
847 }
848
cpg_flow_control_state_get(cpg_handle_t handle,cpg_flow_control_state_t * flow_control_state)849 cs_error_t cpg_flow_control_state_get (
850 cpg_handle_t handle,
851 cpg_flow_control_state_t *flow_control_state)
852 {
853 cs_error_t error;
854 struct cpg_inst *cpg_inst;
855
856 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
857 if (error != CS_OK) {
858 return (error);
859 }
860 *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
861 error = CS_OK;
862
863 hdb_handle_put (&cpg_handle_t_db, handle);
864
865 return (error);
866 }
867
868 static int
memory_map(char * path,const char * file,void ** buf,size_t bytes)869 memory_map (char *path, const char *file, void **buf, size_t bytes)
870 {
871 int32_t fd;
872 void *addr;
873 int32_t res;
874 char *buffer;
875 int32_t i;
876 size_t written;
877 size_t page_size;
878 long int sysconf_page_size;
879 mode_t old_umask;
880
881 snprintf (path, PATH_MAX, "/dev/shm/%s", file);
882
883 old_umask = umask(CPG_MEMORY_MAP_UMASK);
884 fd = mkstemp (path);
885 (void)umask(old_umask);
886 if (fd == -1) {
887 snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
888 old_umask = umask(CPG_MEMORY_MAP_UMASK);
889 fd = mkstemp (path);
890 (void)umask(old_umask);
891 if (fd == -1) {
892 return (-1);
893 }
894 }
895
896 res = ftruncate (fd, bytes);
897 if (res == -1) {
898 goto error_close_unlink;
899 }
900 sysconf_page_size = sysconf(_SC_PAGESIZE);
901 if (sysconf_page_size <= 0) {
902 goto error_close_unlink;
903 }
904 page_size = sysconf_page_size;
905 buffer = malloc (page_size);
906 if (buffer == NULL) {
907 goto error_close_unlink;
908 }
909 memset (buffer, 0, page_size);
910 for (i = 0; i < (bytes / page_size); i++) {
911 retry_write:
912 written = write (fd, buffer, page_size);
913 if (written == -1 && errno == EINTR) {
914 goto retry_write;
915 }
916 if (written != page_size) {
917 free (buffer);
918 goto error_close_unlink;
919 }
920 }
921 free (buffer);
922
923 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
924 MAP_SHARED, fd, 0);
925
926 if (addr == MAP_FAILED) {
927 goto error_close_unlink;
928 }
929 #ifdef MADV_NOSYNC
930 madvise(addr, bytes, MADV_NOSYNC);
931 #endif
932
933 res = close (fd);
934 if (res) {
935 munmap(addr, bytes);
936
937 return (-1);
938 }
939 *buf = addr;
940
941 return 0;
942
943 error_close_unlink:
944 close (fd);
945 unlink(path);
946 return -1;
947 }
948
cpg_zcb_alloc(cpg_handle_t handle,size_t size,void ** buffer)949 cs_error_t cpg_zcb_alloc (
950 cpg_handle_t handle,
951 size_t size,
952 void **buffer)
953 {
954 void *buf = NULL;
955 char path[PATH_MAX];
956 mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
957 struct qb_ipc_response_header res_coroipcs_zc_alloc;
958 size_t map_size;
959 struct iovec iovec;
960 struct coroipcs_zc_header *hdr;
961 cs_error_t error;
962 struct cpg_inst *cpg_inst;
963
964 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
965 if (error != CS_OK) {
966 return (error);
967 }
968
969 map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
970 assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
971
972 if (strlen(path) >= CPG_ZC_PATH_LEN) {
973 unlink(path);
974 munmap (buf, map_size);
975 return (CS_ERR_NAME_TOO_LONG);
976 }
977
978 req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
979 req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
980 req_coroipcc_zc_alloc.map_size = map_size;
981 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
982
983 iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
984 iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
985
986 error = coroipcc_msg_send_reply_receive (
987 cpg_inst->c,
988 &iovec,
989 1,
990 &res_coroipcs_zc_alloc,
991 sizeof (struct qb_ipc_response_header));
992
993 if (error != CS_OK) {
994 goto error_exit;
995 }
996
997 hdr = (struct coroipcs_zc_header *)buf;
998 hdr->map_size = map_size;
999 *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
1000
1001 error_exit:
1002 hdb_handle_put (&cpg_handle_t_db, handle);
1003 return (error);
1004 }
1005
cpg_zcb_free(cpg_handle_t handle,void * buffer)1006 cs_error_t cpg_zcb_free (
1007 cpg_handle_t handle,
1008 void *buffer)
1009 {
1010 cs_error_t error;
1011 unsigned int res;
1012 struct cpg_inst *cpg_inst;
1013 mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1014 struct qb_ipc_response_header res_coroipcs_zc_free;
1015 struct iovec iovec;
1016 struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1017
1018 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1019 if (error != CS_OK) {
1020 return (error);
1021 }
1022
1023 req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1024 req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1025 req_coroipcc_zc_free.map_size = header->map_size;
1026 req_coroipcc_zc_free.server_address = header->server_address;
1027
1028 iovec.iov_base = (void *)&req_coroipcc_zc_free;
1029 iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1030
1031 error = coroipcc_msg_send_reply_receive (
1032 cpg_inst->c,
1033 &iovec,
1034 1,
1035 &res_coroipcs_zc_free,
1036 sizeof (struct qb_ipc_response_header));
1037
1038 if (error != CS_OK) {
1039 goto error_exit;
1040 }
1041
1042 res = munmap ((void *)header, header->map_size);
1043 if (res == -1) {
1044 error = qb_to_cs_error(-errno);
1045
1046 goto error_exit;
1047 }
1048
1049 error_exit:
1050 hdb_handle_put (&cpg_handle_t_db, handle);
1051
1052 return (error);
1053 }
1054
cpg_zcb_mcast_joined(cpg_handle_t handle,cpg_guarantee_t guarantee,void * msg,size_t msg_len)1055 cs_error_t cpg_zcb_mcast_joined (
1056 cpg_handle_t handle,
1057 cpg_guarantee_t guarantee,
1058 void *msg,
1059 size_t msg_len)
1060 {
1061 cs_error_t error;
1062 struct cpg_inst *cpg_inst;
1063 struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1064 struct res_lib_cpg_mcast res_lib_cpg_mcast;
1065 mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1066 struct coroipcs_zc_header *hdr;
1067 struct iovec iovec;
1068
1069 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1070 if (error != CS_OK) {
1071 return (error);
1072 }
1073
1074 if (msg_len > IPC_REQUEST_SIZE) {
1075 error = CS_ERR_TOO_BIG;
1076 goto error_exit;
1077 }
1078
1079 req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1080 req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1081 msg_len;
1082
1083 req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1084 req_lib_cpg_mcast->guarantee = guarantee;
1085 req_lib_cpg_mcast->msglen = msg_len;
1086
1087 hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1088
1089 req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1090 req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1091 req_coroipcc_zc_execute.server_address = hdr->server_address;
1092
1093 iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1094 iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1095
1096 error = coroipcc_msg_send_reply_receive (
1097 cpg_inst->c,
1098 &iovec,
1099 1,
1100 &res_lib_cpg_mcast,
1101 sizeof(res_lib_cpg_mcast));
1102
1103 if (error != CS_OK) {
1104 goto error_exit;
1105 }
1106
1107 error = res_lib_cpg_mcast.header.error;
1108
1109 error_exit:
1110 hdb_handle_put (&cpg_handle_t_db, handle);
1111
1112 return (error);
1113 }
1114
send_fragments(struct cpg_inst * cpg_inst,cpg_guarantee_t guarantee,size_t msg_len,const struct iovec * iovec,unsigned int iov_len)1115 static cs_error_t send_fragments (
1116 struct cpg_inst *cpg_inst,
1117 cpg_guarantee_t guarantee,
1118 size_t msg_len,
1119 const struct iovec *iovec,
1120 unsigned int iov_len)
1121 {
1122 int i;
1123 cs_error_t error = CS_OK;
1124 struct iovec iov[2];
1125 struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
1126 struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1127 size_t sent = 0;
1128 size_t iov_sent = 0;
1129 int retry_count;
1130
1131 req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
1132 req_lib_cpg_mcast.guarantee = guarantee;
1133 req_lib_cpg_mcast.msglen = msg_len;
1134
1135 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1136 iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1137
1138 i=0;
1139 iov_sent = 0 ;
1140 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1141
1142 while (error == CS_OK && sent < msg_len) {
1143
1144 retry_count = 0;
1145 if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1146 iov[1].iov_len = cpg_inst->max_msg_size;
1147 }
1148 else {
1149 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1150 }
1151
1152 if (sent == 0) {
1153 req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
1154 }
1155 else if ((sent + iov[1].iov_len) == msg_len) {
1156 req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
1157 }
1158 else {
1159 req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
1160 }
1161
1162 req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1163 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1164 iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1165
1166 resend:
1167 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1168 &res_lib_cpg_partial_send,
1169 sizeof (res_lib_cpg_partial_send));
1170
1171 if (error == CS_ERR_TRY_AGAIN) {
1172 fprintf(stderr, "sleep. counter=%d\n", retry_count);
1173 if (++retry_count > MAX_RETRIES) {
1174 goto error_exit;
1175 }
1176 usleep(10000);
1177 goto resend;
1178 }
1179
1180 iov_sent += iov[1].iov_len;
1181 sent += iov[1].iov_len;
1182
1183 /* Next iovec */
1184 if (iov_sent >= iovec[i].iov_len) {
1185 i++;
1186 iov_sent = 0;
1187 }
1188 error = res_lib_cpg_partial_send.header.error;
1189 }
1190 error_exit:
1191 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1192
1193 return error;
1194 }
1195
1196
cpg_mcast_joined(cpg_handle_t handle,cpg_guarantee_t guarantee,const struct iovec * iovec,unsigned int iov_len)1197 cs_error_t cpg_mcast_joined (
1198 cpg_handle_t handle,
1199 cpg_guarantee_t guarantee,
1200 const struct iovec *iovec,
1201 unsigned int iov_len)
1202 {
1203 int i;
1204 cs_error_t error;
1205 struct cpg_inst *cpg_inst;
1206 struct iovec iov[64];
1207 struct req_lib_cpg_mcast req_lib_cpg_mcast;
1208 size_t msg_len = 0;
1209
1210 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1211 if (error != CS_OK) {
1212 return (error);
1213 }
1214
1215 for (i = 0; i < iov_len; i++ ) {
1216 msg_len += iovec[i].iov_len;
1217 }
1218
1219 if (msg_len > cpg_inst->max_msg_size) {
1220 error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1221 goto error_exit;
1222 }
1223
1224 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1225 msg_len;
1226
1227 req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1228 req_lib_cpg_mcast.guarantee = guarantee;
1229 req_lib_cpg_mcast.msglen = msg_len;
1230
1231 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1232 iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1233 memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1234
1235 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1236 error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1237 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1238
1239 error_exit:
1240 hdb_handle_put (&cpg_handle_t_db, handle);
1241
1242 return (error);
1243 }
1244
cpg_iteration_initialize(cpg_handle_t handle,cpg_iteration_type_t iteration_type,const struct cpg_name * group,cpg_iteration_handle_t * cpg_iteration_handle)1245 cs_error_t cpg_iteration_initialize(
1246 cpg_handle_t handle,
1247 cpg_iteration_type_t iteration_type,
1248 const struct cpg_name *group,
1249 cpg_iteration_handle_t *cpg_iteration_handle)
1250 {
1251 cs_error_t error;
1252 struct iovec iov;
1253 struct cpg_inst *cpg_inst;
1254 struct cpg_iteration_instance_t *cpg_iteration_instance;
1255 struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1256 struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1257
1258 if (group && group->length > CPG_MAX_NAME_LENGTH) {
1259 return (CS_ERR_NAME_TOO_LONG);
1260 }
1261 if (cpg_iteration_handle == NULL) {
1262 return (CS_ERR_INVALID_PARAM);
1263 }
1264
1265 if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1266 (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1267 return (CS_ERR_INVALID_PARAM);
1268 }
1269
1270 if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1271 iteration_type != CPG_ITERATION_ALL) {
1272
1273 return (CS_ERR_INVALID_PARAM);
1274 }
1275
1276 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1277 if (error != CS_OK) {
1278 return (error);
1279 }
1280
1281 error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1282 sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1283 if (error != CS_OK) {
1284 goto error_put_cpg_db;
1285 }
1286
1287 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1288 (void *)&cpg_iteration_instance));
1289 if (error != CS_OK) {
1290 goto error_destroy;
1291 }
1292
1293 cpg_iteration_instance->conn = cpg_inst->c;
1294
1295 list_init (&cpg_iteration_instance->list);
1296
1297 req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1298 req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1299 req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1300 if (group) {
1301 marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1302 }
1303
1304 iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1305 iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1306
1307 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1308 &iov,
1309 1,
1310 &res_lib_cpg_iterationinitialize,
1311 sizeof (struct res_lib_cpg_iterationinitialize));
1312
1313 if (error != CS_OK) {
1314 goto error_put_destroy;
1315 }
1316
1317 cpg_iteration_instance->executive_iteration_handle =
1318 res_lib_cpg_iterationinitialize.iteration_handle;
1319 cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1320
1321 list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1322
1323 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1324 hdb_handle_put (&cpg_handle_t_db, handle);
1325
1326 return (res_lib_cpg_iterationinitialize.header.error);
1327
1328 error_put_destroy:
1329 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1330 error_destroy:
1331 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1332 error_put_cpg_db:
1333 hdb_handle_put (&cpg_handle_t_db, handle);
1334
1335 return (error);
1336 }
1337
cpg_iteration_next(cpg_iteration_handle_t handle,struct cpg_iteration_description_t * description)1338 cs_error_t cpg_iteration_next(
1339 cpg_iteration_handle_t handle,
1340 struct cpg_iteration_description_t *description)
1341 {
1342 cs_error_t error;
1343 struct cpg_iteration_instance_t *cpg_iteration_instance;
1344 struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1345 struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1346
1347 if (description == NULL) {
1348 return CS_ERR_INVALID_PARAM;
1349 }
1350
1351 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1352 (void *)&cpg_iteration_instance));
1353 if (error != CS_OK) {
1354 goto error_exit;
1355 }
1356
1357 req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1358 req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1359 req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1360
1361 error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1362 &req_lib_cpg_iterationnext,
1363 req_lib_cpg_iterationnext.header.size));
1364 if (error != CS_OK) {
1365 goto error_put;
1366 }
1367
1368 error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1369 &res_lib_cpg_iterationnext,
1370 sizeof(struct res_lib_cpg_iterationnext), -1));
1371 if (error != CS_OK) {
1372 goto error_put;
1373 }
1374
1375 marshall_from_mar_cpg_iteration_description_t(
1376 description,
1377 &res_lib_cpg_iterationnext.description);
1378
1379 error = res_lib_cpg_iterationnext.header.error;
1380
1381 error_put:
1382 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1383
1384 error_exit:
1385 return (error);
1386 }
1387
cpg_iteration_finalize(cpg_iteration_handle_t handle)1388 cs_error_t cpg_iteration_finalize (
1389 cpg_iteration_handle_t handle)
1390 {
1391 cs_error_t error;
1392 struct iovec iov;
1393 struct cpg_iteration_instance_t *cpg_iteration_instance;
1394 struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1395 struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1396
1397 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1398 (void *)&cpg_iteration_instance));
1399 if (error != CS_OK) {
1400 goto error_exit;
1401 }
1402
1403 req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1404 req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1405 req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1406
1407 iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1408 iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1409
1410 error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1411 &iov,
1412 1,
1413 &res_lib_cpg_iterationfinalize,
1414 sizeof (struct req_lib_cpg_iterationfinalize));
1415
1416 if (error != CS_OK) {
1417 goto error_put;
1418 }
1419
1420 cpg_iteration_instance_finalize (cpg_iteration_instance);
1421 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1422
1423 return (res_lib_cpg_iterationfinalize.header.error);
1424
1425 error_put:
1426 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1427 error_exit:
1428 return (error);
1429 }
1430
1431 /** @} */
1432