1 /*
2  * vi: set autoindent tabstop=4 shiftwidth=4 :
3  *
4  * Copyright (c) 2006-2015 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Christine Caulfield (ccaulfi@redhat.com)
9  * Author: Jan Friesse (jfriesse@redhat.com)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  *   this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  *   this list of conditions and the following disclaimer in the documentation
20  *   and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  *   contributors may be used to endorse or promote products derived from this
23  *   software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 /*
38  * Provides a closed process group API using the coroipcc executive
39  */
40 
41 #include <config.h>
42 
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #include <sys/mman.h>
50 #include <sys/uio.h>
51 #include <sys/stat.h>
52 #include <errno.h>
53 #include <limits.h>
54 
55 #include <qb/qbdefs.h>
56 #include <qb/qbipcc.h>
57 #include <qb/qblog.h>
58 
59 #include <corosync/hdb.h>
60 #include <corosync/list.h>
61 #include <corosync/corotypes.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/cpg.h>
64 #include <corosync/ipc_cpg.h>
65 
66 #include "util.h"
67 
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71 
72 /*
73  * Maximum number of times to retry a send when transmitting
74  * a large message fragment
75  */
76 #define MAX_RETRIES 100
77 
78 /*
79  * ZCB files have following umask (umask is same as used in libqb)
80  */
81 #define CPG_MEMORY_MAP_UMASK		077
82 
83 struct cpg_assembly_data
84 {
85 	struct list_head list;
86 	uint32_t nodeid;
87 	uint32_t pid;
88 	char *assembly_buf;
89 	uint32_t assembly_buf_ptr;
90 };
91 
92 struct cpg_inst {
93 	qb_ipcc_connection_t *c;
94 	int finalize;
95 	void *context;
96 	union {
97 		cpg_model_data_t model_data;
98 		cpg_model_v1_data_t model_v1_data;
99 	};
100 	struct list_head iteration_list_head;
101 	uint32_t max_msg_size;
102 	struct list_head assembly_list_head;
103 };
104 static void cpg_inst_free (void *inst);
105 
106 DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107 
108 struct cpg_iteration_instance_t {
109 	cpg_iteration_handle_t cpg_iteration_handle;
110 	qb_ipcc_connection_t *conn;
111 	hdb_handle_t executive_iteration_handle;
112 	struct list_head list;
113 };
114 
115 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116 
117 
118 /*
119  * Internal (not visible by API) functions
120  */
121 
122 static cs_error_t
coroipcc_msg_send_reply_receive(qb_ipcc_connection_t * c,const struct iovec * iov,unsigned int iov_len,void * res_msg,size_t res_len)123 coroipcc_msg_send_reply_receive (
124 	qb_ipcc_connection_t *c,
125 	const struct iovec *iov,
126 	unsigned int iov_len,
127 	void *res_msg,
128 	size_t res_len)
129 {
130 	return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
131 				CS_IPC_TIMEOUT_MS));
132 }
133 
cpg_iteration_instance_finalize(struct cpg_iteration_instance_t * cpg_iteration_instance)134 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135 {
136 	list_del (&cpg_iteration_instance->list);
137 	hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138 }
139 
cpg_inst_free(void * inst)140 static void cpg_inst_free (void *inst)
141 {
142 	struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143 	qb_ipcc_disconnect(cpg_inst->c);
144 }
145 
cpg_inst_finalize(struct cpg_inst * cpg_inst,hdb_handle_t handle)146 static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147 {
148 	struct list_head *iter, *iter_next;
149 	struct cpg_iteration_instance_t *cpg_iteration_instance;
150 
151 	/*
152 	 * Traverse thru iteration instances and delete them
153 	 */
154 	for (iter = cpg_inst->iteration_list_head.next;	iter != &cpg_inst->iteration_list_head;iter = iter_next) {
155 		iter_next = iter->next;
156 
157 		cpg_iteration_instance = list_entry (iter, struct cpg_iteration_instance_t, list);
158 
159 		cpg_iteration_instance_finalize (cpg_iteration_instance);
160 	}
161 	hdb_handle_destroy (&cpg_handle_t_db, handle);
162 }
163 
164 /**
165  * @defgroup cpg_coroipcc The closed process group API
166  * @ingroup coroipcc
167  *
168  * @{
169  */
170 
cpg_initialize(cpg_handle_t * handle,cpg_callbacks_t * callbacks)171 cs_error_t cpg_initialize (
172 	cpg_handle_t *handle,
173 	cpg_callbacks_t *callbacks)
174 {
175 	cpg_model_v1_data_t model_v1_data;
176 
177 	memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
178 
179 	if (callbacks) {
180 		model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
181 		model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
182 	}
183 
184 	return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
185 }
186 
cpg_model_initialize(cpg_handle_t * handle,cpg_model_t model,cpg_model_data_t * model_data,void * context)187 cs_error_t cpg_model_initialize (
188 	cpg_handle_t *handle,
189 	cpg_model_t model,
190 	cpg_model_data_t *model_data,
191 	void *context)
192 {
193 	cs_error_t error;
194 	struct cpg_inst *cpg_inst;
195 
196 	if (model != CPG_MODEL_V1) {
197 		error = CS_ERR_INVALID_PARAM;
198 		goto error_no_destroy;
199 	}
200 
201 	error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
202 	if (error != CS_OK) {
203 		goto error_no_destroy;
204 	}
205 
206 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
207 	if (error != CS_OK) {
208 		goto error_destroy;
209 	}
210 
211 	cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
212 	if (cpg_inst->c == NULL) {
213 		error = qb_to_cs_error(-errno);
214 		goto error_put_destroy;
215 	}
216 
217 	if (model_data != NULL) {
218 		switch (model) {
219 		case CPG_MODEL_V1:
220 			memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
221 			if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
222 				error = CS_ERR_INVALID_PARAM;
223 
224 				goto error_destroy;
225 			}
226 			break;
227 		}
228 	}
229 
230 	/* Allow space for corosync internal headers */
231 	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
232 	cpg_inst->model_data.model = model;
233 	cpg_inst->context = context;
234 
235 	list_init(&cpg_inst->iteration_list_head);
236 
237 	list_init(&cpg_inst->assembly_list_head);
238 
239 	hdb_handle_put (&cpg_handle_t_db, *handle);
240 
241 	return (CS_OK);
242 
243 error_put_destroy:
244 	hdb_handle_put (&cpg_handle_t_db, *handle);
245 error_destroy:
246 	hdb_handle_destroy (&cpg_handle_t_db, *handle);
247 error_no_destroy:
248 	return (error);
249 }
250 
cpg_finalize(cpg_handle_t handle)251 cs_error_t cpg_finalize (
252 	cpg_handle_t handle)
253 {
254 	struct cpg_inst *cpg_inst;
255 	struct iovec iov;
256 	struct req_lib_cpg_finalize req_lib_cpg_finalize;
257 	struct res_lib_cpg_finalize res_lib_cpg_finalize;
258 	cs_error_t error;
259 
260 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
261 	if (error != CS_OK) {
262 		return (error);
263 	}
264 
265 	/*
266 	 * Another thread has already started finalizing
267 	 */
268 	if (cpg_inst->finalize) {
269 		hdb_handle_put (&cpg_handle_t_db, handle);
270 		return (CS_ERR_BAD_HANDLE);
271 	}
272 
273 	cpg_inst->finalize = 1;
274 
275 	/*
276 	 * Send service request
277 	 */
278 	req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
279 	req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
280 
281 	iov.iov_base = (void *)&req_lib_cpg_finalize;
282 	iov.iov_len = sizeof (struct req_lib_cpg_finalize);
283 
284 	error = coroipcc_msg_send_reply_receive (cpg_inst->c,
285 		&iov,
286 		1,
287 		&res_lib_cpg_finalize,
288 		sizeof (struct res_lib_cpg_finalize));
289 
290 	cpg_inst_finalize (cpg_inst, handle);
291 	hdb_handle_put (&cpg_handle_t_db, handle);
292 
293 	return (error);
294 }
295 
cpg_fd_get(cpg_handle_t handle,int * fd)296 cs_error_t cpg_fd_get (
297 	cpg_handle_t handle,
298 	int *fd)
299 {
300 	cs_error_t error;
301 	struct cpg_inst *cpg_inst;
302 
303 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
304 	if (error != CS_OK) {
305 		return (error);
306 	}
307 
308 	error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
309 
310 	hdb_handle_put (&cpg_handle_t_db, handle);
311 
312 	return (error);
313 }
314 
cpg_max_atomic_msgsize_get(cpg_handle_t handle,uint32_t * size)315 cs_error_t cpg_max_atomic_msgsize_get (
316 	cpg_handle_t handle,
317 	uint32_t *size)
318 {
319 	cs_error_t error;
320 	struct cpg_inst *cpg_inst;
321 
322 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
323 	if (error != CS_OK) {
324 		return (error);
325 	}
326 
327 	*size = cpg_inst->max_msg_size;
328 
329 	hdb_handle_put (&cpg_handle_t_db, handle);
330 
331 	return (error);
332 }
333 
cpg_context_get(cpg_handle_t handle,void ** context)334 cs_error_t cpg_context_get (
335 	cpg_handle_t handle,
336 	void **context)
337 {
338 	cs_error_t error;
339 	struct cpg_inst *cpg_inst;
340 
341 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
342 	if (error != CS_OK) {
343 		return (error);
344 	}
345 
346 	*context = cpg_inst->context;
347 
348 	hdb_handle_put (&cpg_handle_t_db, handle);
349 
350 	return (CS_OK);
351 }
352 
cpg_context_set(cpg_handle_t handle,void * context)353 cs_error_t cpg_context_set (
354 	cpg_handle_t handle,
355 	void *context)
356 {
357 	cs_error_t error;
358 	struct cpg_inst *cpg_inst;
359 
360 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
361 	if (error != CS_OK) {
362 		return (error);
363 	}
364 
365 	cpg_inst->context = context;
366 
367 	hdb_handle_put (&cpg_handle_t_db, handle);
368 
369 	return (CS_OK);
370 }
371 
cpg_dispatch(cpg_handle_t handle,cs_dispatch_flags_t dispatch_types)372 cs_error_t cpg_dispatch (
373 	cpg_handle_t handle,
374 	cs_dispatch_flags_t dispatch_types)
375 {
376 	int timeout = -1;
377 	cs_error_t error;
378 	int cont = 1; /* always continue do loop except when set to 0 */
379 	struct cpg_inst *cpg_inst;
380 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
381 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
382 	struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
383 	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
384 	struct cpg_inst cpg_inst_copy;
385 	struct qb_ipc_response_header *dispatch_data;
386 	struct cpg_address member_list[CPG_MEMBERS_MAX];
387 	struct cpg_address left_list[CPG_MEMBERS_MAX];
388 	struct cpg_address joined_list[CPG_MEMBERS_MAX];
389 	struct cpg_name group_name;
390 	struct cpg_assembly_data *assembly_data;
391 	struct list_head *iter, *tmp_iter;
392 	mar_cpg_address_t *left_list_start;
393 	mar_cpg_address_t *joined_list_start;
394 	unsigned int i;
395 	struct cpg_ring_id ring_id;
396 	uint32_t totem_member_list[CPG_MEMBERS_MAX];
397 	int32_t errno_res;
398 	char dispatch_buf[IPC_DISPATCH_SIZE];
399 
400 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
401 	if (error != CS_OK) {
402 		return (error);
403 	}
404 
405 	/*
406 	 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
407 	 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
408 	 */
409 	if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
410 		timeout = 0;
411 	}
412 
413 	dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
414 	do {
415 		errno_res = qb_ipcc_event_recv (
416 			cpg_inst->c,
417 			dispatch_buf,
418 			IPC_DISPATCH_SIZE,
419 			timeout);
420 		error = qb_to_cs_error (errno_res);
421 		if (error == CS_ERR_BAD_HANDLE) {
422 			error = CS_OK;
423 			goto error_put;
424 		}
425 		if (error == CS_ERR_TRY_AGAIN) {
426 			if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
427 				/*
428 				 * Don't mask error
429 				 */
430 				goto error_put;
431 			}
432 			error = CS_OK;
433 			if (dispatch_types == CS_DISPATCH_ALL) {
434 				break; /* exit do while cont is 1 loop */
435 			} else {
436 				continue; /* next poll */
437 			}
438 		}
439 		if (error != CS_OK) {
440 			goto error_put;
441 		}
442 
443 		/*
444 		 * Make copy of callbacks, message data, unlock instance, and call callback
445 		 * A risk of this dispatch method is that the callback routines may
446 		 * operate at the same time that cpgFinalize has been called.
447 		 */
448 		memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
449 		switch (cpg_inst_copy.model_data.model) {
450 		case CPG_MODEL_V1:
451 			/*
452 			 * Dispatch incoming message
453 			 */
454 			switch (dispatch_data->id) {
455 			case MESSAGE_RES_CPG_DELIVER_CALLBACK:
456 				if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
457 					break;
458 				}
459 
460 				res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
461 
462 				marshall_from_mar_cpg_name_t (
463 					&group_name,
464 					&res_cpg_deliver_callback->group_name);
465 
466 				cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
467 					&group_name,
468 					res_cpg_deliver_callback->nodeid,
469 					res_cpg_deliver_callback->pid,
470 					&res_cpg_deliver_callback->message,
471 					res_cpg_deliver_callback->msglen);
472 				break;
473 
474 			case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
475 				res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
476 
477 				marshall_from_mar_cpg_name_t (
478 					&group_name,
479 					&res_cpg_partial_deliver_callback->group_name);
480 
481 				/*
482 				 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
483 				 */
484 				assembly_data = NULL;
485 				for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head; iter = iter->next) {
486 					struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
487 					if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
488 						assembly_data = current_assembly_data;
489 						break;
490 					}
491 				}
492 
493 				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
494 
495 					/*
496 					 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
497 					 * Otherwise the sending of packet must have been interrupted and error should have
498 					 * been reported to sending client. Therefore here last assembly will be dropped.
499 					 */
500 					if (assembly_data) {
501 						list_del (&assembly_data->list);
502 						free(assembly_data->assembly_buf);
503 						free(assembly_data);
504 						assembly_data = NULL;
505 					}
506 
507 					assembly_data = malloc(sizeof(struct cpg_assembly_data));
508 					if (!assembly_data) {
509 						error = CS_ERR_NO_MEMORY;
510 						goto error_put;
511 					}
512 
513 					assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
514 					assembly_data->pid = res_cpg_partial_deliver_callback->pid;
515 					assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
516 					if (!assembly_data->assembly_buf) {
517 						free(assembly_data);
518 						error = CS_ERR_NO_MEMORY;
519 						goto error_put;
520 					}
521 					assembly_data->assembly_buf_ptr = 0;
522 					list_init (&assembly_data->list);
523 
524 					list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
525 				}
526 				if (assembly_data) {
527 					memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
528 						res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
529 					assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
530 
531 					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
532 						cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533 							&group_name,
534 							res_cpg_partial_deliver_callback->nodeid,
535 							res_cpg_partial_deliver_callback->pid,
536 							assembly_data->assembly_buf,
537 							res_cpg_partial_deliver_callback->msglen);
538 
539 						list_del (&assembly_data->list);
540 						free(assembly_data->assembly_buf);
541 						free(assembly_data);
542 					}
543 				}
544 				break;
545 
546 			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
547 				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
548 					break;
549 				}
550 
551 				res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
552 
553 				for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
554 					marshall_from_mar_cpg_address_t (&member_list[i],
555 						&res_cpg_confchg_callback->member_list[i]);
556 				}
557 				left_list_start = res_cpg_confchg_callback->member_list +
558 					res_cpg_confchg_callback->member_list_entries;
559 				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
560 					marshall_from_mar_cpg_address_t (&left_list[i],
561 						&left_list_start[i]);
562 				}
563 				joined_list_start = res_cpg_confchg_callback->member_list +
564 					res_cpg_confchg_callback->member_list_entries +
565 					res_cpg_confchg_callback->left_list_entries;
566 				for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
567 					marshall_from_mar_cpg_address_t (&joined_list[i],
568 						&joined_list_start[i]);
569 				}
570 				marshall_from_mar_cpg_name_t (
571 					&group_name,
572 					&res_cpg_confchg_callback->group_name);
573 
574 				cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
575 					&group_name,
576 					member_list,
577 					res_cpg_confchg_callback->member_list_entries,
578 					left_list,
579 					res_cpg_confchg_callback->left_list_entries,
580 					joined_list,
581 					res_cpg_confchg_callback->joined_list_entries);
582 
583 				/*
584 				 * If member left while his partial packet was being assembled, assembly data must be removed from list
585 				 */
586 				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
587 					for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head;iter = tmp_iter) {
588 						struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
589 
590 						tmp_iter = iter->next;
591 
592 						if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
593 							continue;
594 
595 						list_del (&current_assembly_data->list);
596 						free(current_assembly_data->assembly_buf);
597 						free(current_assembly_data);
598 					}
599 				}
600 
601 				break;
602 			case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
603 				if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
604 					break;
605 				}
606 
607 				res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
608 
609 				marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
610 				for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
611 					totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
612 				}
613 
614 				cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
615 					ring_id,
616 					res_cpg_totem_confchg_callback->member_list_entries,
617 					totem_member_list);
618 				break;
619 			default:
620 				error = CS_ERR_LIBRARY;
621 				goto error_put;
622 				break;
623 			} /* - switch (dispatch_data->id) */
624 			break; /* case CPG_MODEL_V1 */
625 		} /* - switch (cpg_inst_copy.model_data.model) */
626 
627 		if (cpg_inst_copy.finalize || cpg_inst->finalize) {
628 			/*
629 			 * If the finalize has been called then get out of the dispatch.
630 			 */
631 			cpg_inst->finalize = 1;
632 			error = CS_ERR_BAD_HANDLE;
633 			goto error_put;
634 		}
635 
636 		/*
637 		 * Determine if more messages should be processed
638 		 */
639 		if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
640 			cont = 0;
641 		}
642 	} while (cont);
643 
644 error_put:
645 	hdb_handle_put (&cpg_handle_t_db, handle);
646 	return (error);
647 }
648 
cpg_join(cpg_handle_t handle,const struct cpg_name * group)649 cs_error_t cpg_join (
650     cpg_handle_t handle,
651     const struct cpg_name *group)
652 {
653 	cs_error_t error;
654 	struct cpg_inst *cpg_inst;
655 	struct iovec iov[2];
656 	struct req_lib_cpg_join req_lib_cpg_join;
657 	struct res_lib_cpg_join response;
658 
659 	if (group->length > CPG_MAX_NAME_LENGTH) {
660 		return (CS_ERR_NAME_TOO_LONG);
661 	}
662 
663 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
664 	if (error != CS_OK) {
665 		return (error);
666 	}
667 
668 	/* Now join */
669 	req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
670 	req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
671 	req_lib_cpg_join.pid = getpid();
672 	req_lib_cpg_join.flags = 0;
673 
674 	switch (cpg_inst->model_data.model) {
675 	case CPG_MODEL_V1:
676 		req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
677 		break;
678 	}
679 
680 	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
681 		group);
682 
683 	iov[0].iov_base = (void *)&req_lib_cpg_join;
684 	iov[0].iov_len = sizeof (struct req_lib_cpg_join);
685 
686 	do {
687 		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
688 			&response, sizeof (struct res_lib_cpg_join));
689 
690 		if (error != CS_OK) {
691 			goto error_exit;
692 		}
693 	} while (response.header.error == CS_ERR_BUSY);
694 
695 	error = response.header.error;
696 
697 error_exit:
698 	hdb_handle_put (&cpg_handle_t_db, handle);
699 
700 	return (error);
701 }
702 
cpg_leave(cpg_handle_t handle,const struct cpg_name * group)703 cs_error_t cpg_leave (
704     cpg_handle_t handle,
705     const struct cpg_name *group)
706 {
707 	cs_error_t error;
708 	struct cpg_inst *cpg_inst;
709 	struct iovec iov[2];
710 	struct req_lib_cpg_leave req_lib_cpg_leave;
711 	struct res_lib_cpg_leave res_lib_cpg_leave;
712 
713         if (group->length > CPG_MAX_NAME_LENGTH) {
714 		return (CS_ERR_NAME_TOO_LONG);
715         }
716 
717 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
718 	if (error != CS_OK) {
719 		return (error);
720 	}
721 
722 	req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
723 	req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
724 	req_lib_cpg_leave.pid = getpid();
725 	marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
726 		group);
727 
728 	iov[0].iov_base = (void *)&req_lib_cpg_leave;
729 	iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
730 
731 	do {
732 		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
733 			&res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
734 
735 		if (error != CS_OK) {
736 			goto error_exit;
737 		}
738 	} while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
739 
740 	error = res_lib_cpg_leave.header.error;
741 
742 error_exit:
743 	hdb_handle_put (&cpg_handle_t_db, handle);
744 
745 	return (error);
746 }
747 
cpg_membership_get(cpg_handle_t handle,struct cpg_name * group_name,struct cpg_address * member_list,int * member_list_entries)748 cs_error_t cpg_membership_get (
749 	cpg_handle_t handle,
750 	struct cpg_name *group_name,
751 	struct cpg_address *member_list,
752 	int *member_list_entries)
753 {
754 	cs_error_t error;
755 	struct cpg_inst *cpg_inst;
756 	struct iovec iov;
757 	struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
758 	struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
759 	unsigned int i;
760 
761 	if (group_name->length > CPG_MAX_NAME_LENGTH) {
762 		return (CS_ERR_NAME_TOO_LONG);
763 	}
764 	if (member_list == NULL) {
765 		return (CS_ERR_INVALID_PARAM);
766 	}
767 	if (member_list_entries == NULL) {
768 		return (CS_ERR_INVALID_PARAM);
769 	}
770 
771 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
772 	if (error != CS_OK) {
773 		return (error);
774 	}
775 
776 	req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
777 	req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
778 
779 	marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
780 		group_name);
781 
782 	iov.iov_base = (void *)&req_lib_cpg_membership_get;
783 	iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
784 
785 	error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
786 			&res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
787 
788 	if (error != CS_OK) {
789 		goto error_exit;
790 	}
791 
792 	error = res_lib_cpg_membership_get.header.error;
793 
794 	/*
795 	 * Copy results to caller
796 	 */
797 	*member_list_entries = res_lib_cpg_membership_get.member_count;
798 	if (member_list) {
799 		for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
800 			marshall_from_mar_cpg_address_t (&member_list[i],
801 				&res_lib_cpg_membership_get.member_list[i]);
802 		}
803 	}
804 
805 error_exit:
806 	hdb_handle_put (&cpg_handle_t_db, handle);
807 
808 	return (error);
809 }
810 
cpg_local_get(cpg_handle_t handle,unsigned int * local_nodeid)811 cs_error_t cpg_local_get (
812 	cpg_handle_t handle,
813 	unsigned int *local_nodeid)
814 {
815 	cs_error_t error;
816 	struct cpg_inst *cpg_inst;
817 	struct iovec iov;
818 	struct req_lib_cpg_local_get req_lib_cpg_local_get;
819 	struct res_lib_cpg_local_get res_lib_cpg_local_get;
820 
821 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
822 	if (error != CS_OK) {
823 		return (error);
824 	}
825 
826 	req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
827 	req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
828 
829 	iov.iov_base = (void *)&req_lib_cpg_local_get;
830 	iov.iov_len = sizeof (struct req_lib_cpg_local_get);
831 
832 	error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
833 		&res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
834 
835 	if (error != CS_OK) {
836 		goto error_exit;
837 	}
838 
839 	error = res_lib_cpg_local_get.header.error;
840 
841 	*local_nodeid = res_lib_cpg_local_get.local_nodeid;
842 
843 error_exit:
844 	hdb_handle_put (&cpg_handle_t_db, handle);
845 
846 	return (error);
847 }
848 
cpg_flow_control_state_get(cpg_handle_t handle,cpg_flow_control_state_t * flow_control_state)849 cs_error_t cpg_flow_control_state_get (
850 	cpg_handle_t handle,
851 	cpg_flow_control_state_t *flow_control_state)
852 {
853 	cs_error_t error;
854 	struct cpg_inst *cpg_inst;
855 
856 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
857 	if (error != CS_OK) {
858 		return (error);
859 	}
860 	*flow_control_state = CPG_FLOW_CONTROL_DISABLED;
861 	error = CS_OK;
862 
863 	hdb_handle_put (&cpg_handle_t_db, handle);
864 
865 	return (error);
866 }
867 
868 static int
memory_map(char * path,const char * file,void ** buf,size_t bytes)869 memory_map (char *path, const char *file, void **buf, size_t bytes)
870 {
871 	int32_t fd;
872 	void *addr;
873 	int32_t res;
874 	char *buffer;
875 	int32_t i;
876 	size_t written;
877 	size_t page_size;
878 	long int sysconf_page_size;
879 	mode_t old_umask;
880 
881 	snprintf (path, PATH_MAX, "/dev/shm/%s", file);
882 
883 	old_umask = umask(CPG_MEMORY_MAP_UMASK);
884 	fd = mkstemp (path);
885 	(void)umask(old_umask);
886 	if (fd == -1) {
887 		snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
888 		old_umask = umask(CPG_MEMORY_MAP_UMASK);
889 		fd = mkstemp (path);
890 		(void)umask(old_umask);
891 		if (fd == -1) {
892 			return (-1);
893 		}
894 	}
895 
896 	res = ftruncate (fd, bytes);
897 	if (res == -1) {
898 		goto error_close_unlink;
899 	}
900 	sysconf_page_size = sysconf(_SC_PAGESIZE);
901 	if (sysconf_page_size <= 0) {
902 		goto error_close_unlink;
903 	}
904 	page_size = sysconf_page_size;
905 	buffer = malloc (page_size);
906 	if (buffer == NULL) {
907 		goto error_close_unlink;
908 	}
909 	memset (buffer, 0, page_size);
910 	for (i = 0; i < (bytes / page_size); i++) {
911 retry_write:
912 		written = write (fd, buffer, page_size);
913 		if (written == -1 && errno == EINTR) {
914 			goto retry_write;
915 		}
916 		if (written != page_size) {
917 			free (buffer);
918 			goto error_close_unlink;
919 		}
920 	}
921 	free (buffer);
922 
923 	addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
924 		MAP_SHARED, fd, 0);
925 
926 	if (addr == MAP_FAILED) {
927 		goto error_close_unlink;
928 	}
929 #ifdef MADV_NOSYNC
930 	madvise(addr, bytes, MADV_NOSYNC);
931 #endif
932 
933 	res = close (fd);
934 	if (res) {
935 		munmap(addr, bytes);
936 
937 		return (-1);
938 	}
939 	*buf = addr;
940 
941 	return 0;
942 
943 error_close_unlink:
944 	close (fd);
945 	unlink(path);
946 	return -1;
947 }
948 
cpg_zcb_alloc(cpg_handle_t handle,size_t size,void ** buffer)949 cs_error_t cpg_zcb_alloc (
950 	cpg_handle_t handle,
951 	size_t size,
952 	void **buffer)
953 {
954 	void *buf = NULL;
955 	char path[PATH_MAX];
956 	mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
957 	struct qb_ipc_response_header res_coroipcs_zc_alloc;
958 	size_t map_size;
959 	struct iovec iovec;
960 	struct coroipcs_zc_header *hdr;
961 	cs_error_t error;
962 	struct cpg_inst *cpg_inst;
963 
964 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
965 	if (error != CS_OK) {
966 		return (error);
967 	}
968 
969 	map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
970 	assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
971 
972 	if (strlen(path) >= CPG_ZC_PATH_LEN) {
973 		unlink(path);
974 		munmap (buf, map_size);
975 		return (CS_ERR_NAME_TOO_LONG);
976 	}
977 
978 	req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
979 	req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
980 	req_coroipcc_zc_alloc.map_size = map_size;
981 	strcpy (req_coroipcc_zc_alloc.path_to_file, path);
982 
983 	iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
984 	iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
985 
986 	error = coroipcc_msg_send_reply_receive (
987 		cpg_inst->c,
988 		&iovec,
989 		1,
990 		&res_coroipcs_zc_alloc,
991 		sizeof (struct qb_ipc_response_header));
992 
993 	if (error != CS_OK) {
994 		goto error_exit;
995 	}
996 
997 	hdr = (struct coroipcs_zc_header *)buf;
998 	hdr->map_size = map_size;
999 	*buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
1000 
1001 error_exit:
1002 	hdb_handle_put (&cpg_handle_t_db, handle);
1003 	return (error);
1004 }
1005 
cpg_zcb_free(cpg_handle_t handle,void * buffer)1006 cs_error_t cpg_zcb_free (
1007 	cpg_handle_t handle,
1008 	void *buffer)
1009 {
1010 	cs_error_t error;
1011 	unsigned int res;
1012 	struct cpg_inst *cpg_inst;
1013 	mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1014 	struct qb_ipc_response_header res_coroipcs_zc_free;
1015 	struct iovec iovec;
1016 	struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1017 
1018 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1019 	if (error != CS_OK) {
1020 		return (error);
1021 	}
1022 
1023 	req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1024 	req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1025 	req_coroipcc_zc_free.map_size = header->map_size;
1026 	req_coroipcc_zc_free.server_address = header->server_address;
1027 
1028 	iovec.iov_base = (void *)&req_coroipcc_zc_free;
1029 	iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1030 
1031 	error = coroipcc_msg_send_reply_receive (
1032 		cpg_inst->c,
1033 		&iovec,
1034 		1,
1035 		&res_coroipcs_zc_free,
1036 		sizeof (struct qb_ipc_response_header));
1037 
1038 	if (error != CS_OK) {
1039 		goto error_exit;
1040 	}
1041 
1042 	res = munmap ((void *)header, header->map_size);
1043 	if (res == -1) {
1044 		error = qb_to_cs_error(-errno);
1045 
1046 		goto error_exit;
1047 	}
1048 
1049 error_exit:
1050 	hdb_handle_put (&cpg_handle_t_db, handle);
1051 
1052 	return (error);
1053 }
1054 
cpg_zcb_mcast_joined(cpg_handle_t handle,cpg_guarantee_t guarantee,void * msg,size_t msg_len)1055 cs_error_t cpg_zcb_mcast_joined (
1056 	cpg_handle_t handle,
1057 	cpg_guarantee_t guarantee,
1058 	void *msg,
1059 	size_t msg_len)
1060 {
1061 	cs_error_t error;
1062 	struct cpg_inst *cpg_inst;
1063 	struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1064 	struct res_lib_cpg_mcast res_lib_cpg_mcast;
1065 	mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1066 	struct coroipcs_zc_header *hdr;
1067 	struct iovec iovec;
1068 
1069 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1070 	if (error != CS_OK) {
1071 		return (error);
1072 	}
1073 
1074 	if (msg_len > IPC_REQUEST_SIZE) {
1075 		error = CS_ERR_TOO_BIG;
1076 		goto error_exit;
1077 	}
1078 
1079 	req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1080 	req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1081 		msg_len;
1082 
1083 	req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1084 	req_lib_cpg_mcast->guarantee = guarantee;
1085 	req_lib_cpg_mcast->msglen = msg_len;
1086 
1087 	hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1088 
1089 	req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1090 	req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1091 	req_coroipcc_zc_execute.server_address = hdr->server_address;
1092 
1093 	iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1094 	iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1095 
1096 	error = coroipcc_msg_send_reply_receive (
1097 		cpg_inst->c,
1098 		&iovec,
1099 		1,
1100 		&res_lib_cpg_mcast,
1101 		sizeof(res_lib_cpg_mcast));
1102 
1103 	if (error != CS_OK) {
1104 		goto error_exit;
1105 	}
1106 
1107 	error = res_lib_cpg_mcast.header.error;
1108 
1109 error_exit:
1110 	hdb_handle_put (&cpg_handle_t_db, handle);
1111 
1112 	return (error);
1113 }
1114 
send_fragments(struct cpg_inst * cpg_inst,cpg_guarantee_t guarantee,size_t msg_len,const struct iovec * iovec,unsigned int iov_len)1115 static cs_error_t send_fragments (
1116 	struct cpg_inst *cpg_inst,
1117 	cpg_guarantee_t guarantee,
1118 	size_t msg_len,
1119 	const struct iovec *iovec,
1120 	unsigned int iov_len)
1121 {
1122 	int i;
1123 	cs_error_t error = CS_OK;
1124 	struct iovec iov[2];
1125 	struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
1126 	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1127 	size_t sent = 0;
1128 	size_t iov_sent = 0;
1129 	int retry_count;
1130 
1131 	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
1132 	req_lib_cpg_mcast.guarantee = guarantee;
1133 	req_lib_cpg_mcast.msglen = msg_len;
1134 
1135 	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1136 	iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1137 
1138 	i=0;
1139 	iov_sent = 0 ;
1140 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
1141 
1142 	while (error == CS_OK && sent < msg_len) {
1143 
1144 		retry_count = 0;
1145 		if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1146 			iov[1].iov_len = cpg_inst->max_msg_size;
1147 		}
1148 		else {
1149 			iov[1].iov_len = iovec[i].iov_len - iov_sent;
1150 		}
1151 
1152 		if (sent == 0) {
1153 			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
1154 		}
1155 		else if ((sent + iov[1].iov_len) == msg_len) {
1156 			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
1157 		}
1158 		else {
1159 			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
1160 		}
1161 
1162 		req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1163 		req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1164 		iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1165 
1166 	resend:
1167 		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1168 							 &res_lib_cpg_partial_send,
1169 							 sizeof (res_lib_cpg_partial_send));
1170 
1171 		if (error == CS_ERR_TRY_AGAIN) {
1172 			fprintf(stderr, "sleep. counter=%d\n", retry_count);
1173 			if (++retry_count > MAX_RETRIES) {
1174 				goto error_exit;
1175 			}
1176 			usleep(10000);
1177 			goto resend;
1178 		}
1179 
1180 		iov_sent += iov[1].iov_len;
1181 		sent += iov[1].iov_len;
1182 
1183 		/* Next iovec */
1184 		if (iov_sent >= iovec[i].iov_len) {
1185 			i++;
1186 			iov_sent = 0;
1187 		}
1188 		error = res_lib_cpg_partial_send.header.error;
1189 	}
1190 error_exit:
1191 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
1192 
1193 	return error;
1194 }
1195 
1196 
cpg_mcast_joined(cpg_handle_t handle,cpg_guarantee_t guarantee,const struct iovec * iovec,unsigned int iov_len)1197 cs_error_t cpg_mcast_joined (
1198 	cpg_handle_t handle,
1199 	cpg_guarantee_t guarantee,
1200 	const struct iovec *iovec,
1201 	unsigned int iov_len)
1202 {
1203 	int i;
1204 	cs_error_t error;
1205 	struct cpg_inst *cpg_inst;
1206 	struct iovec iov[64];
1207 	struct req_lib_cpg_mcast req_lib_cpg_mcast;
1208 	size_t msg_len = 0;
1209 
1210 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1211 	if (error != CS_OK) {
1212 		return (error);
1213 	}
1214 
1215 	for (i = 0; i < iov_len; i++ ) {
1216 		msg_len += iovec[i].iov_len;
1217 	}
1218 
1219 	if (msg_len > cpg_inst->max_msg_size) {
1220 		error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1221 		goto error_exit;
1222 	}
1223 
1224 	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1225 		msg_len;
1226 
1227 	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1228 	req_lib_cpg_mcast.guarantee = guarantee;
1229 	req_lib_cpg_mcast.msglen = msg_len;
1230 
1231 	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1232 	iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1233 	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1234 
1235 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
1236 	error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1237 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
1238 
1239 error_exit:
1240 	hdb_handle_put (&cpg_handle_t_db, handle);
1241 
1242 	return (error);
1243 }
1244 
cpg_iteration_initialize(cpg_handle_t handle,cpg_iteration_type_t iteration_type,const struct cpg_name * group,cpg_iteration_handle_t * cpg_iteration_handle)1245 cs_error_t cpg_iteration_initialize(
1246 	cpg_handle_t handle,
1247 	cpg_iteration_type_t iteration_type,
1248 	const struct cpg_name *group,
1249 	cpg_iteration_handle_t *cpg_iteration_handle)
1250 {
1251 	cs_error_t error;
1252 	struct iovec iov;
1253 	struct cpg_inst *cpg_inst;
1254 	struct cpg_iteration_instance_t *cpg_iteration_instance;
1255 	struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1256 	struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1257 
1258 	if (group && group->length > CPG_MAX_NAME_LENGTH) {
1259 		return (CS_ERR_NAME_TOO_LONG);
1260 	}
1261 	if (cpg_iteration_handle == NULL) {
1262 		return (CS_ERR_INVALID_PARAM);
1263 	}
1264 
1265 	if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1266 		(iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1267 		return (CS_ERR_INVALID_PARAM);
1268 	}
1269 
1270 	if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1271 	    iteration_type != CPG_ITERATION_ALL) {
1272 
1273 		return (CS_ERR_INVALID_PARAM);
1274 	}
1275 
1276 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1277 	if (error != CS_OK) {
1278 		return (error);
1279 	}
1280 
1281 	error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1282 		sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1283 	if (error != CS_OK) {
1284 		goto error_put_cpg_db;
1285 	}
1286 
1287 	error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1288 		(void *)&cpg_iteration_instance));
1289 	if (error != CS_OK) {
1290 		goto error_destroy;
1291 	}
1292 
1293 	cpg_iteration_instance->conn = cpg_inst->c;
1294 
1295 	list_init (&cpg_iteration_instance->list);
1296 
1297 	req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1298 	req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1299 	req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1300 	if (group) {
1301 		marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1302 	}
1303 
1304 	iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1305 	iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1306 
1307 	error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1308 		&iov,
1309 		1,
1310 		&res_lib_cpg_iterationinitialize,
1311 		sizeof (struct res_lib_cpg_iterationinitialize));
1312 
1313 	if (error != CS_OK) {
1314 		goto error_put_destroy;
1315 	}
1316 
1317 	cpg_iteration_instance->executive_iteration_handle =
1318 		res_lib_cpg_iterationinitialize.iteration_handle;
1319 	cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1320 
1321 	list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1322 
1323 	hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1324 	hdb_handle_put (&cpg_handle_t_db, handle);
1325 
1326 	return (res_lib_cpg_iterationinitialize.header.error);
1327 
1328 error_put_destroy:
1329 	hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1330 error_destroy:
1331 	hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1332 error_put_cpg_db:
1333 	hdb_handle_put (&cpg_handle_t_db, handle);
1334 
1335 	return (error);
1336 }
1337 
cpg_iteration_next(cpg_iteration_handle_t handle,struct cpg_iteration_description_t * description)1338 cs_error_t cpg_iteration_next(
1339 	cpg_iteration_handle_t handle,
1340 	struct cpg_iteration_description_t *description)
1341 {
1342 	cs_error_t error;
1343 	struct cpg_iteration_instance_t *cpg_iteration_instance;
1344 	struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1345 	struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1346 
1347 	if (description == NULL) {
1348 		return CS_ERR_INVALID_PARAM;
1349 	}
1350 
1351 	error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1352 		(void *)&cpg_iteration_instance));
1353 	if (error != CS_OK) {
1354 		goto error_exit;
1355 	}
1356 
1357 	req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1358 	req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1359 	req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1360 
1361 	error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1362 				&req_lib_cpg_iterationnext,
1363 				req_lib_cpg_iterationnext.header.size));
1364 	if (error != CS_OK) {
1365 		goto error_put;
1366 	}
1367 
1368 	error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1369 				&res_lib_cpg_iterationnext,
1370 				sizeof(struct res_lib_cpg_iterationnext), -1));
1371 	if (error != CS_OK) {
1372 		goto error_put;
1373 	}
1374 
1375 	marshall_from_mar_cpg_iteration_description_t(
1376 			description,
1377 			&res_lib_cpg_iterationnext.description);
1378 
1379 	error = res_lib_cpg_iterationnext.header.error;
1380 
1381 error_put:
1382 	hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1383 
1384 error_exit:
1385 	return (error);
1386 }
1387 
cpg_iteration_finalize(cpg_iteration_handle_t handle)1388 cs_error_t cpg_iteration_finalize (
1389 	cpg_iteration_handle_t handle)
1390 {
1391 	cs_error_t error;
1392 	struct iovec iov;
1393 	struct cpg_iteration_instance_t *cpg_iteration_instance;
1394 	struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1395 	struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1396 
1397 	error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1398 		(void *)&cpg_iteration_instance));
1399 	if (error != CS_OK) {
1400 		goto error_exit;
1401 	}
1402 
1403 	req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1404 	req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1405 	req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1406 
1407 	iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1408 	iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1409 
1410 	error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1411 		&iov,
1412 		1,
1413 		&res_lib_cpg_iterationfinalize,
1414 		sizeof (struct req_lib_cpg_iterationfinalize));
1415 
1416 	if (error != CS_OK) {
1417 		goto error_put;
1418 	}
1419 
1420 	cpg_iteration_instance_finalize (cpg_iteration_instance);
1421 	hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1422 
1423 	return (res_lib_cpg_iterationfinalize.header.error);
1424 
1425 error_put:
1426 	hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1427 error_exit:
1428 	return (error);
1429 }
1430 
1431 /** @} */
1432