1 /*
2  * Copyright (c) 2003-2005 MontaVista Software, Inc.
3  * Copyright (c) 2005 OSDL.
4  * Copyright (c) 2006-2012 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Steven Dake (sdake@redhat.com)
9  * Author: Mark Haverkamp (markh@osdl.org)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  *   this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  *   this list of conditions and the following disclaimer in the documentation
20  *   and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  *   contributors may be used to endorse or promote products derived from this
23  *   software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /*
39  * FRAGMENTATION AND PACKING ALGORITHM:
40  *
41  * Assemble the entire message into one buffer
42  * if full fragment
43  *	 store fragment into lengths list
44  *	for each full fragment
45  *		multicast fragment
46  *		set length and fragment fields of pg mesage
47  *	store remaining multicast into head of fragmentation data and set lens field
48  *
49  * If a message exceeds the maximum packet size allowed by the totem
50  * single ring protocol, the protocol could lose forward progress.
51  * Statically calculating the allowed data amount doesn't work because
52  * the amount of data allowed depends on the number of fragments in
53  * each message.  In this implementation, the maximum fragment size
54  * is dynamically calculated for each fragment added to the message.
55 
56  * It is possible for a message to be two bytes short of the maximum
57  * packet size.  This occurs when a message or collection of
58  * messages + the mcast header + the lens are two bytes short of the
59  * end of the packet.  Since another len field consumes two bytes, the
60  * len field would consume the rest of the packet without room for data.
61  *
62  * One optimization would be to forgo the final len field and determine
63  * it from the size of the udp datagram.  Then this condition would no
64  * longer occur.
65  */
66 
67 /*
68  * ASSEMBLY AND UNPACKING ALGORITHM:
69  *
70  * copy incoming packet into assembly data buffer indexed by current
71  * location of end of fragment
72  *
73  * if not fragmented
74  *	deliver all messages in assembly data buffer
75  * else
76  * if msg_count > 1 and fragmented
77  *	deliver all messages except last message in assembly data buffer
78  *	copy last fragmented section to start of assembly data buffer
79  * else
80  * if msg_count = 1 and fragmented
81  *	do nothing
82  *
83  */
84 
85 #include <config.h>
86 
87 #ifdef HAVE_ALLOCA_H
88 #include <alloca.h>
89 #endif
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
94 #include <sys/uio.h>
95 #include <stdio.h>
96 #include <stdlib.h>
97 #include <string.h>
98 #include <assert.h>
99 #include <pthread.h>
100 #include <errno.h>
101 #include <limits.h>
102 
103 #include <corosync/swab.h>
104 #include <corosync/list.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
110 
111 #include "totemmrp.h"
112 #include "totemsrp.h"
113 
114 #define min(a,b) ((a) < (b)) ? a : b
115 
116 struct totempg_mcast_header {
117 	short version;
118 	short type;
119 };
120 
121 #if !(defined(__i386__) || defined(__x86_64__))
122 /*
123  * Need align on architectures different then i386 or x86_64
124  */
125 #define TOTEMPG_NEED_ALIGN 1
126 #endif
127 
128 /*
129  * totempg_mcast structure
130  *
131  * header:				Identify the mcast.
132  * fragmented:			Set if this message continues into next message
133  * continuation:		Set if this message is a continuation from last message
134  * msg_count			Indicates how many packed messages are contained
135  * 						in the mcast.
136  * Also, the size of each packed message and the messages themselves are
137  * appended to the end of this structure when sent.
138  */
139 struct totempg_mcast {
140 	struct totempg_mcast_header header;
141 	unsigned char fragmented;
142 	unsigned char continuation;
143 	unsigned short msg_count;
144 	/*
145 	 * short msg_len[msg_count];
146 	 */
147 	/*
148 	 * data for messages
149 	 */
150 };
151 
152 /*
153  * Maximum packet size for totem pg messages
154  */
155 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
156 	sizeof (struct totempg_mcast))
157 
158 /*
159  * Local variables used for packing small messages
160  */
161 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
162 
163 static int mcast_packed_msg_count = 0;
164 
165 static int totempg_reserved = 1;
166 
167 static unsigned int totempg_size_limit;
168 
169 static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
170 
171 static uint32_t totempg_threaded_mode = 0;
172 
173 /*
174  * Function and data used to log messages
175  */
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
183 	int level,
184 	int subsys,
185 	const char *function,
186 	const char *file,
187 	int line,
188 	const char *format, ...) __attribute__((format(printf, 6, 7)));
189 
190 struct totem_config *totempg_totem_config;
191 
192 static totempg_stats_t totempg_stats;
193 
194 enum throw_away_mode {
195 	THROW_AWAY_INACTIVE,
196 	THROW_AWAY_ACTIVE
197 };
198 
199 struct assembly {
200 	unsigned int nodeid;
201 	unsigned char data[MESSAGE_SIZE_MAX];
202 	int index;
203 	unsigned char last_frag_num;
204 	enum throw_away_mode throw_away_mode;
205 	struct list_head list;
206 };
207 
208 static void assembly_deref (struct assembly *assembly);
209 
210 static int callback_token_received_fn (enum totem_callback_token_type type,
211 	const void *data);
212 
213 DECLARE_LIST_INIT(assembly_list_inuse);
214 
215 /*
216  * Free list is used both for transitional and operational assemblies
217  */
218 DECLARE_LIST_INIT(assembly_list_free);
219 
220 DECLARE_LIST_INIT(assembly_list_inuse_trans);
221 
222 DECLARE_LIST_INIT(totempg_groups_list);
223 
224 /*
225  * Staging buffer for packed messages.  Messages are staged in this buffer
226  * before sending.  Multiple messages may fit which cuts down on the
227  * number of mcasts sent.  If a message doesn't completely fit, then
228  * the mcast header has a fragment bit set that says that there are more
229  * data to follow.  fragment_size is an index into the buffer.  It indicates
230  * the size of message data and where to place new message data.
231  * fragment_contuation indicates whether the first packed message in
232  * the buffer is a continuation of a previously packed fragment.
233  */
234 static unsigned char *fragmentation_data;
235 
236 static int fragment_size = 0;
237 
238 static int fragment_continuation = 0;
239 
240 static int totempg_waiting_transack = 0;
241 
242 struct totempg_group_instance {
243 	void (*deliver_fn) (
244 		unsigned int nodeid,
245 		const void *msg,
246 		unsigned int msg_len,
247 		int endian_conversion_required);
248 
249 	void (*confchg_fn) (
250 		enum totem_configuration_type configuration_type,
251 		const unsigned int *member_list, size_t member_list_entries,
252 		const unsigned int *left_list, size_t left_list_entries,
253 		const unsigned int *joined_list, size_t joined_list_entries,
254 		const struct memb_ring_id *ring_id);
255 
256 	struct totempg_group *groups;
257 
258 	int groups_cnt;
259 	int32_t q_level;
260 
261 	struct list_head list;
262 };
263 
264 static unsigned char next_fragment = 1;
265 
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267 
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269 
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271 
272 #define log_printf(level, format, args...)			\
273 do {								\
274         totempg_log_printf(level,				\
275 			   totempg_subsys_id,			\
276 			   __FUNCTION__, __FILE__, __LINE__,	\
277 			   format, ##args);			\
278 } while (0);
279 
280 static int msg_count_send_ok (int msg_count);
281 
282 static int byte_count_send_ok (int byte_count);
283 
totempg_waiting_trans_ack_cb(int waiting_trans_ack)284 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285 {
286 	log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287 	totempg_waiting_transack = waiting_trans_ack;
288 }
289 
assembly_ref(unsigned int nodeid)290 static struct assembly *assembly_ref (unsigned int nodeid)
291 {
292 	struct assembly *assembly;
293 	struct list_head *list;
294 	struct list_head *active_assembly_list_inuse;
295 
296 	if (totempg_waiting_transack) {
297 		active_assembly_list_inuse = &assembly_list_inuse_trans;
298 	} else {
299 		active_assembly_list_inuse = &assembly_list_inuse;
300 	}
301 
302 	/*
303 	 * Search inuse list for node id and return assembly buffer if found
304 	 */
305 	for (list = active_assembly_list_inuse->next;
306 		list != active_assembly_list_inuse;
307 		list = list->next) {
308 
309 		assembly = list_entry (list, struct assembly, list);
310 
311 		if (nodeid == assembly->nodeid) {
312 			return (assembly);
313 		}
314 	}
315 
316 	/*
317 	 * Nothing found in inuse list get one from free list if available
318 	 */
319 	if (list_empty (&assembly_list_free) == 0) {
320 		assembly = list_entry (assembly_list_free.next, struct assembly, list);
321 		list_del (&assembly->list);
322 		list_add (&assembly->list, active_assembly_list_inuse);
323 		assembly->nodeid = nodeid;
324 		assembly->index = 0;
325 		assembly->last_frag_num = 0;
326 		assembly->throw_away_mode = THROW_AWAY_INACTIVE;
327 		return (assembly);
328 	}
329 
330 	/*
331 	 * Nothing available in inuse or free list, so allocate a new one
332 	 */
333 	assembly = malloc (sizeof (struct assembly));
334 	/*
335 	 * TODO handle memory allocation failure here
336 	 */
337 	assert (assembly);
338 	assembly->nodeid = nodeid;
339 	assembly->data[0] = 0;
340 	assembly->index = 0;
341 	assembly->last_frag_num = 0;
342 	assembly->throw_away_mode = THROW_AWAY_INACTIVE;
343 	list_init (&assembly->list);
344 	list_add (&assembly->list, active_assembly_list_inuse);
345 
346 	return (assembly);
347 }
348 
assembly_deref(struct assembly * assembly)349 static void assembly_deref (struct assembly *assembly)
350 {
351 
352 	list_del (&assembly->list);
353 	list_add (&assembly->list, &assembly_list_free);
354 }
355 
assembly_deref_from_normal_and_trans(int nodeid)356 static void assembly_deref_from_normal_and_trans (int nodeid)
357 {
358 	int j;
359 	struct list_head *list, *list_next;
360 	struct list_head *active_assembly_list_inuse;
361 	struct assembly *assembly;
362 
363 	for (j = 0; j < 2; j++) {
364 		if (j == 0) {
365 			active_assembly_list_inuse = &assembly_list_inuse;
366 		} else {
367 			active_assembly_list_inuse = &assembly_list_inuse_trans;
368 		}
369 
370 		for (list = active_assembly_list_inuse->next;
371 			list != active_assembly_list_inuse;
372 			list = list_next) {
373 
374 			list_next = list->next;
375 			assembly = list_entry (list, struct assembly, list);
376 
377 			if (nodeid == assembly->nodeid) {
378 				list_del (&assembly->list);
379 				list_add (&assembly->list, &assembly_list_free);
380 			}
381 		}
382 	}
383 
384 }
385 
app_confchg_fn(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id)386 static inline void app_confchg_fn (
387 	enum totem_configuration_type configuration_type,
388 	const unsigned int *member_list, size_t member_list_entries,
389 	const unsigned int *left_list, size_t left_list_entries,
390 	const unsigned int *joined_list, size_t joined_list_entries,
391 	const struct memb_ring_id *ring_id)
392 {
393 	int i;
394 	struct totempg_group_instance *instance;
395 	struct list_head *list;
396 
397 	/*
398 	 * For every leaving processor, add to free list
399 	 * This also has the side effect of clearing out the dataset
400 	 * In the leaving processor's assembly buffer.
401 	 */
402 	for (i = 0; i < left_list_entries; i++) {
403 		assembly_deref_from_normal_and_trans (left_list[i]);
404 	}
405 
406 	for (list = totempg_groups_list.next;
407 		list != &totempg_groups_list;
408 		list = list->next) {
409 
410 		instance = list_entry (list, struct totempg_group_instance, list);
411 
412 		if (instance->confchg_fn) {
413 			instance->confchg_fn (
414 				configuration_type,
415 				member_list,
416 				member_list_entries,
417 				left_list,
418 				left_list_entries,
419 				joined_list,
420 				joined_list_entries,
421 				ring_id);
422 		}
423 	}
424 }
425 
group_endian_convert(void * msg,int msg_len)426 static inline void group_endian_convert (
427 	void *msg,
428 	int msg_len)
429 {
430 	unsigned short *group_len;
431 	int i;
432 	char *aligned_msg;
433 
434 #ifdef TOTEMPG_NEED_ALIGN
435 	/*
436 	 * Align data structure for not i386 or x86_64
437 	 */
438 	if ((size_t)msg % 4 != 0) {
439 		aligned_msg = alloca(msg_len);
440 		memcpy(aligned_msg, msg, msg_len);
441 	} else {
442 		aligned_msg = msg;
443 	}
444 #else
445 	aligned_msg = msg;
446 #endif
447 
448 	group_len = (unsigned short *)aligned_msg;
449 	group_len[0] = swab16(group_len[0]);
450 	for (i = 1; i < group_len[0] + 1; i++) {
451 		group_len[i] = swab16(group_len[i]);
452 	}
453 
454 	if (aligned_msg != msg) {
455 		memcpy(msg, aligned_msg, msg_len);
456 	}
457 }
458 
group_matches(struct iovec * iovec,unsigned int iov_len,struct totempg_group * groups_b,unsigned int group_b_cnt,unsigned int * adjust_iovec)459 static inline int group_matches (
460 	struct iovec *iovec,
461 	unsigned int iov_len,
462 	struct totempg_group *groups_b,
463 	unsigned int group_b_cnt,
464 	unsigned int *adjust_iovec)
465 {
466 	unsigned short *group_len;
467 	char *group_name;
468 	int i;
469 	int j;
470 #ifdef TOTEMPG_NEED_ALIGN
471         struct iovec iovec_aligned = { NULL, 0 };
472 #endif
473 
474 	assert (iov_len == 1);
475 
476 #ifdef TOTEMPG_NEED_ALIGN
477 	/*
478 	 * Align data structure for not i386 or x86_64
479 	 */
480 	if ((size_t)iovec->iov_base % 4 != 0) {
481 		iovec_aligned.iov_base = alloca(iovec->iov_len);
482 		memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
483 		iovec_aligned.iov_len = iovec->iov_len;
484 		iovec = &iovec_aligned;
485 	}
486 #endif
487 
488 	group_len = (unsigned short *)iovec->iov_base;
489 	group_name = ((char *)iovec->iov_base) +
490 		sizeof (unsigned short) * (group_len[0] + 1);
491 
492 
493 	/*
494 	 * Calculate amount to adjust the iovec by before delivering to app
495 	 */
496 	*adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
497 	for (i = 1; i < group_len[0] + 1; i++) {
498 		*adjust_iovec += group_len[i];
499 	}
500 
501 	/*
502 	 * Determine if this message should be delivered to this instance
503 	 */
504 	for (i = 1; i < group_len[0] + 1; i++) {
505 		for (j = 0; j < group_b_cnt; j++) {
506 			if ((group_len[i] == groups_b[j].group_len) &&
507 				(memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
508 				return (1);
509 			}
510 		}
511 		group_name += group_len[i];
512 	}
513 	return (0);
514 }
515 
516 
app_deliver_fn(unsigned int nodeid,void * msg,unsigned int msg_len,int endian_conversion_required)517 static inline void app_deliver_fn (
518 	unsigned int nodeid,
519 	void *msg,
520 	unsigned int msg_len,
521 	int endian_conversion_required)
522 {
523 	struct totempg_group_instance *instance;
524 	struct iovec stripped_iovec;
525 	unsigned int adjust_iovec;
526 	struct iovec *iovec;
527 	struct list_head *list;
528 
529         struct iovec aligned_iovec = { NULL, 0 };
530 
531 	if (endian_conversion_required) {
532 		group_endian_convert (msg, msg_len);
533 	}
534 
535 	/*
536 	 * TODO: segmentation/assembly need to be redesigned to provide aligned access
537 	 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
538 	 * compatibility
539 	 */
540 
541 #ifdef TOTEMPG_NEED_ALIGN
542 	/*
543 	 * Align data structure for not i386 or x86_64
544 	 */
545 	aligned_iovec.iov_base = alloca(msg_len);
546 	aligned_iovec.iov_len = msg_len;
547 	memcpy(aligned_iovec.iov_base, msg, msg_len);
548 #else
549 	aligned_iovec.iov_base = msg;
550 	aligned_iovec.iov_len = msg_len;
551 #endif
552 
553 	iovec = &aligned_iovec;
554 
555 	for (list = totempg_groups_list.next;
556 		list != &totempg_groups_list;
557 		list = list->next) {
558 
559 		instance = list_entry (list, struct totempg_group_instance, list);
560 		if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
561 			stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
562 			stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
563 
564 #ifdef TOTEMPG_NEED_ALIGN
565 			/*
566 			 * Align data structure for not i386 or x86_64
567 			 */
568 			if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
569 				/*
570 				 * Deal with misalignment
571 				 */
572 				stripped_iovec.iov_base =
573 					alloca (stripped_iovec.iov_len);
574 				memcpy (stripped_iovec.iov_base,
575 					 (char *)iovec->iov_base + adjust_iovec,
576 					stripped_iovec.iov_len);
577 			}
578 #endif
579 			instance->deliver_fn (
580 				nodeid,
581 				stripped_iovec.iov_base,
582 				stripped_iovec.iov_len,
583 				endian_conversion_required);
584 		}
585 	}
586 }
587 
totempg_confchg_fn(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id)588 static void totempg_confchg_fn (
589 	enum totem_configuration_type configuration_type,
590 	const unsigned int *member_list, size_t member_list_entries,
591 	const unsigned int *left_list, size_t left_list_entries,
592 	const unsigned int *joined_list, size_t joined_list_entries,
593 	const struct memb_ring_id *ring_id)
594 {
595 // TODO optimize this
596 	app_confchg_fn (configuration_type,
597 		member_list, member_list_entries,
598 		left_list, left_list_entries,
599 		joined_list, joined_list_entries,
600 		ring_id);
601 }
602 
totempg_deliver_fn(unsigned int nodeid,const void * msg,unsigned int msg_len,int endian_conversion_required)603 static void totempg_deliver_fn (
604 	unsigned int nodeid,
605 	const void *msg,
606 	unsigned int msg_len,
607 	int endian_conversion_required)
608 {
609 	struct totempg_mcast *mcast;
610 	unsigned short *msg_lens;
611 	int i;
612 	struct assembly *assembly;
613 	char header[FRAME_SIZE_MAX];
614 	int msg_count;
615 	int continuation;
616 	int start;
617 	const char *data;
618 	int datasize;
619 	struct iovec iov_delv;
620 
621 	assembly = assembly_ref (nodeid);
622 	assert (assembly);
623 
624 	/*
625 	 * Assemble the header into one block of data and
626 	 * assemble the packet contents into one block of data to simplify delivery
627 	 */
628 
629 	mcast = (struct totempg_mcast *)msg;
630 	if (endian_conversion_required) {
631 		mcast->msg_count = swab16 (mcast->msg_count);
632 	}
633 
634 	msg_count = mcast->msg_count;
635 	datasize = sizeof (struct totempg_mcast) +
636 		msg_count * sizeof (unsigned short);
637 
638 	memcpy (header, msg, datasize);
639 	data = msg;
640 
641 	msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
642 	if (endian_conversion_required) {
643 		for (i = 0; i < mcast->msg_count; i++) {
644 			msg_lens[i] = swab16 (msg_lens[i]);
645 		}
646 	}
647 
648 	memcpy (&assembly->data[assembly->index], &data[datasize],
649 		msg_len - datasize);
650 
651 	/*
652 	 * If the last message in the buffer is a fragment, then we
653 	 * can't deliver it.  We'll first deliver the full messages
654 	 * then adjust the assembly buffer so we can add the rest of the
655 	 * fragment when it arrives.
656 	 */
657 	msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
658 	continuation = mcast->continuation;
659 	iov_delv.iov_base = (void *)&assembly->data[0];
660 	iov_delv.iov_len = assembly->index + msg_lens[0];
661 
662 	/*
663 	 * Make sure that if this message is a continuation, that it
664 	 * matches the sequence number of the previous fragment.
665 	 * Also, if the first packed message is a continuation
666 	 * of a previous message, but the assembly buffer
667 	 * is empty, then we need to discard it since we can't
668 	 * assemble a complete message. Likewise, if this message isn't a
669 	 * continuation and the assembly buffer is empty, we have to discard
670 	 * the continued message.
671 	 */
672 	start = 0;
673 
674 	if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
675 		 /* Throw away the first msg block */
676 		if (mcast->fragmented == 0 || mcast->fragmented == 1) {
677 			assembly->throw_away_mode = THROW_AWAY_INACTIVE;
678 
679 			assembly->index += msg_lens[0];
680 			iov_delv.iov_base = (void *)&assembly->data[assembly->index];
681 			iov_delv.iov_len = msg_lens[1];
682 			start = 1;
683 		}
684 	} else
685 	if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
686 		if (continuation == assembly->last_frag_num) {
687 			assembly->last_frag_num = mcast->fragmented;
688 			for  (i = start; i < msg_count; i++) {
689 				app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
690 					endian_conversion_required);
691 				assembly->index += msg_lens[i];
692 				iov_delv.iov_base = (void *)&assembly->data[assembly->index];
693 				if (i < (msg_count - 1)) {
694 					iov_delv.iov_len = msg_lens[i + 1];
695 				}
696 			}
697 		} else {
698 			log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
699 					continuation, assembly->last_frag_num);
700 			assembly->throw_away_mode = THROW_AWAY_ACTIVE;
701 		}
702 	}
703 
704 	if (mcast->fragmented == 0) {
705 		/*
706 		 * End of messages, dereference assembly struct
707 		 */
708 		assembly->last_frag_num = 0;
709 		assembly->index = 0;
710 		assembly_deref (assembly);
711 	} else {
712 		/*
713 		 * Message is fragmented, keep around assembly list
714 		 */
715 		if (mcast->msg_count > 1) {
716 			memmove (&assembly->data[0],
717 				&assembly->data[assembly->index],
718 				msg_lens[msg_count]);
719 
720 			assembly->index = 0;
721 		}
722 		assembly->index += msg_lens[msg_count];
723 	}
724 }
725 
726 /*
727  * Totem Process Group Abstraction
728  * depends on poll abstraction, POSIX, IPV4
729  */
730 
731 void *callback_token_received_handle;
732 
callback_token_received_fn(enum totem_callback_token_type type,const void * data)733 int callback_token_received_fn (enum totem_callback_token_type type,
734 				const void *data)
735 {
736 	struct totempg_mcast mcast;
737 	struct iovec iovecs[3];
738 
739 	if (totempg_threaded_mode == 1) {
740 		pthread_mutex_lock (&mcast_msg_mutex);
741 	}
742 	if (mcast_packed_msg_count == 0) {
743 		if (totempg_threaded_mode == 1) {
744 			pthread_mutex_unlock (&mcast_msg_mutex);
745 		}
746 		return (0);
747 	}
748 	if (totemmrp_avail() == 0) {
749 		if (totempg_threaded_mode == 1) {
750 			pthread_mutex_unlock (&mcast_msg_mutex);
751 		}
752 		return (0);
753 	}
754 	mcast.header.version = 0;
755 	mcast.header.type = 0;
756 	mcast.fragmented = 0;
757 
758 	/*
759 	 * Was the first message in this buffer a continuation of a
760 	 * fragmented message?
761 	 */
762 	mcast.continuation = fragment_continuation;
763 	fragment_continuation = 0;
764 
765 	mcast.msg_count = mcast_packed_msg_count;
766 
767 	iovecs[0].iov_base = (void *)&mcast;
768 	iovecs[0].iov_len = sizeof (struct totempg_mcast);
769 	iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
770 	iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
771 	iovecs[2].iov_base = (void *)&fragmentation_data[0];
772 	iovecs[2].iov_len = fragment_size;
773 	(void)totemmrp_mcast (iovecs, 3, 0);
774 
775 	mcast_packed_msg_count = 0;
776 	fragment_size = 0;
777 
778 	if (totempg_threaded_mode == 1) {
779 		pthread_mutex_unlock (&mcast_msg_mutex);
780 	}
781 	return (0);
782 }
783 
784 /*
785  * Initialize the totem process group abstraction
786  */
totempg_initialize(qb_loop_t * poll_handle,struct totem_config * totem_config)787 int totempg_initialize (
788 	qb_loop_t *poll_handle,
789 	struct totem_config *totem_config)
790 {
791 	int res;
792 
793 	totempg_totem_config = totem_config;
794 	totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
795 	totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
796 	totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
797 	totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
798 	totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
799 	totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
800 	totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
801 
802 	fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
803 	if (fragmentation_data == 0) {
804 		return (-1);
805 	}
806 
807 	totemsrp_net_mtu_adjust (totem_config);
808 
809 	res = totemmrp_initialize (
810 		poll_handle,
811 		totem_config,
812 		&totempg_stats,
813 		totempg_deliver_fn,
814 		totempg_confchg_fn,
815 		totempg_waiting_trans_ack_cb);
816 
817 	if (res == -1) {
818 		goto error_exit;
819 	}
820 
821 	totemmrp_callback_token_create (
822 		&callback_token_received_handle,
823 		TOTEM_CALLBACK_TOKEN_RECEIVED,
824 		0,
825 		callback_token_received_fn,
826 		0);
827 
828 	totempg_size_limit = (totemmrp_avail() - 1) *
829 		(totempg_totem_config->net_mtu -
830 		sizeof (struct totempg_mcast) - 16);
831 
832 	list_init (&totempg_groups_list);
833 
834 error_exit:
835 	return (res);
836 }
837 
totempg_finalize(void)838 void totempg_finalize (void)
839 {
840 	if (totempg_threaded_mode == 1) {
841 		pthread_mutex_lock (&totempg_mutex);
842 	}
843 	totemmrp_finalize ();
844 	if (totempg_threaded_mode == 1) {
845 		pthread_mutex_unlock (&totempg_mutex);
846 	}
847 }
848 
849 /*
850  * Multicast a message
851  */
mcast_msg(struct iovec * iovec_in,unsigned int iov_len,int guarantee)852 static int mcast_msg (
853 	struct iovec *iovec_in,
854 	unsigned int iov_len,
855 	int guarantee)
856 {
857 	int res = 0;
858 	struct totempg_mcast mcast;
859 	struct iovec iovecs[3];
860 	struct iovec iovec[64];
861 	int i;
862 	int dest, src;
863 	int max_packet_size = 0;
864 	int copy_len = 0;
865 	int copy_base = 0;
866 	int total_size = 0;
867 
868 	if (totempg_threaded_mode == 1) {
869 		pthread_mutex_lock (&mcast_msg_mutex);
870 	}
871 	totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1);
872 
873 	/*
874 	 * Remove zero length iovectors from the list
875 	 */
876 	assert (iov_len < 64);
877 	for (dest = 0, src = 0; src < iov_len; src++) {
878 		if (iovec_in[src].iov_len) {
879 			memcpy (&iovec[dest++], &iovec_in[src],
880 				sizeof (struct iovec));
881 		}
882 	}
883 	iov_len = dest;
884 
885 	max_packet_size = TOTEMPG_PACKET_SIZE -
886 		(sizeof (unsigned short) * (mcast_packed_msg_count + 1));
887 
888 	mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
889 
890 	/*
891 	 * Check if we would overwrite new message queue
892 	 */
893 	for (i = 0; i < iov_len; i++) {
894 		total_size += iovec[i].iov_len;
895 	}
896 
897 	if (byte_count_send_ok (total_size + sizeof(unsigned short) *
898 		(mcast_packed_msg_count)) == 0) {
899 
900 		if (totempg_threaded_mode == 1) {
901 			pthread_mutex_unlock (&mcast_msg_mutex);
902 		}
903 		return(-1);
904 	}
905 
906 	mcast.header.version = 0;
907 	for (i = 0; i < iov_len; ) {
908 		mcast.fragmented = 0;
909 		mcast.continuation = fragment_continuation;
910 		copy_len = iovec[i].iov_len - copy_base;
911 
912 		/*
913 		 * If it all fits with room left over, copy it in.
914 		 * We need to leave at least sizeof(short) + 1 bytes in the
915 		 * fragment_buffer on exit so that max_packet_size + fragment_size
916 		 * doesn't exceed the size of the fragment_buffer on the next call.
917 		 */
918 		if ((iovec[i].iov_len + fragment_size) <
919 			(max_packet_size - sizeof (unsigned short))) {
920 
921 			memcpy (&fragmentation_data[fragment_size],
922 				(char *)iovec[i].iov_base + copy_base, copy_len);
923 			fragment_size += copy_len;
924 			mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
925 			next_fragment = 1;
926 			copy_len = 0;
927 			copy_base = 0;
928 			i++;
929 			continue;
930 
931 		/*
932 		 * If it just fits or is too big, then send out what fits.
933 		 */
934 		} else {
935 			unsigned char *data_ptr;
936 
937 			copy_len = min(copy_len, max_packet_size - fragment_size);
938 			if( copy_len == max_packet_size )
939 				data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
940 			else {
941 				data_ptr = fragmentation_data;
942 			}
943 
944 			memcpy (&fragmentation_data[fragment_size],
945 				(unsigned char *)iovec[i].iov_base + copy_base, copy_len);
946 			mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
947 
948 			/*
949 			 * if we're not on the last iovec or the iovec is too large to
950 			 * fit, then indicate a fragment. This also means that the next
951 			 * message will have the continuation of this one.
952 			 */
953 			if ((i < (iov_len - 1)) ||
954 					((copy_base + copy_len) < iovec[i].iov_len)) {
955 				if (!next_fragment) {
956 					next_fragment++;
957 				}
958 				fragment_continuation = next_fragment;
959 				mcast.fragmented = next_fragment++;
960 				assert(fragment_continuation != 0);
961 				assert(mcast.fragmented != 0);
962 			} else {
963 				fragment_continuation = 0;
964 			}
965 
966 			/*
967 			 * assemble the message and send it
968 			 */
969 			mcast.msg_count = ++mcast_packed_msg_count;
970 			iovecs[0].iov_base = (void *)&mcast;
971 			iovecs[0].iov_len = sizeof(struct totempg_mcast);
972 			iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
973 			iovecs[1].iov_len = mcast_packed_msg_count *
974 				sizeof(unsigned short);
975 			iovecs[2].iov_base = (void *)data_ptr;
976 			iovecs[2].iov_len = fragment_size + copy_len;
977 			assert (totemmrp_avail() > 0);
978 			res = totemmrp_mcast (iovecs, 3, guarantee);
979 			if (res == -1) {
980 				goto error_exit;
981 			}
982 
983 			/*
984 			 * Recalculate counts and indexes for the next.
985 			 */
986 			mcast_packed_msg_lens[0] = 0;
987 			mcast_packed_msg_count = 0;
988 			fragment_size = 0;
989 			max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
990 
991 			/*
992 			 * If the iovec all fit, go to the next iovec
993 			 */
994 			if ((copy_base + copy_len) == iovec[i].iov_len) {
995 				copy_len = 0;
996 				copy_base = 0;
997 				i++;
998 
999 			/*
1000 			 * Continue with the rest of the current iovec.
1001 			 */
1002 			} else {
1003 				copy_base += copy_len;
1004 			}
1005 		}
1006 	}
1007 
1008 	/*
1009 	 * Bump only if we added message data.  This may be zero if
1010 	 * the last buffer just fit into the fragmentation_data buffer
1011 	 * and we were at the last iovec.
1012 	 */
1013 	if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1014 			mcast_packed_msg_count++;
1015 	}
1016 
1017 error_exit:
1018 	if (totempg_threaded_mode == 1) {
1019 		pthread_mutex_unlock (&mcast_msg_mutex);
1020 	}
1021 	return (res);
1022 }
1023 
1024 /*
1025  * Determine if a message of msg_size could be queued
1026  */
msg_count_send_ok(int msg_count)1027 static int msg_count_send_ok (
1028 	int msg_count)
1029 {
1030 	int avail = 0;
1031 
1032 	avail = totemmrp_avail ();
1033 	totempg_stats.msg_queue_avail = avail;
1034 
1035 	return ((avail - totempg_reserved) > msg_count);
1036 }
1037 
byte_count_send_ok(int byte_count)1038 static int byte_count_send_ok (
1039 	int byte_count)
1040 {
1041 	unsigned int msg_count = 0;
1042 	int avail = 0;
1043 
1044 	avail = totemmrp_avail ();
1045 
1046 	msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1047 
1048 	return (avail >= msg_count);
1049 }
1050 
send_reserve(int msg_size)1051 static int send_reserve (
1052 	int msg_size)
1053 {
1054 	unsigned int msg_count = 0;
1055 
1056 	msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1057 	totempg_reserved += msg_count;
1058 	totempg_stats.msg_reserved = totempg_reserved;
1059 
1060 	return (msg_count);
1061 }
1062 
send_release(int msg_count)1063 static void send_release (
1064 	int msg_count)
1065 {
1066 	totempg_reserved -= msg_count;
1067 	totempg_stats.msg_reserved = totempg_reserved;
1068 }
1069 
1070 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1071 #undef MESSAGE_QUEUE_MAX
1072 #define MESSAGE_QUEUE_MAX	((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1073 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1074 
q_level_precent_used(void)1075 static uint32_t q_level_precent_used(void)
1076 {
1077 	return (100 - (((totemmrp_avail() - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1078 }
1079 
totempg_callback_token_create(void ** handle_out,enum totem_callback_token_type type,int delete,int (* callback_fn)(enum totem_callback_token_type type,const void *),const void * data)1080 int totempg_callback_token_create (
1081 	void **handle_out,
1082 	enum totem_callback_token_type type,
1083 	int delete,
1084 	int (*callback_fn) (enum totem_callback_token_type type, const void *),
1085 	const void *data)
1086 {
1087 	unsigned int res;
1088 	if (totempg_threaded_mode == 1) {
1089 		pthread_mutex_lock (&callback_token_mutex);
1090 	}
1091 	res = totemmrp_callback_token_create (handle_out, type, delete,
1092 		callback_fn, data);
1093 	if (totempg_threaded_mode == 1) {
1094 		pthread_mutex_unlock (&callback_token_mutex);
1095 	}
1096 	return (res);
1097 }
1098 
totempg_callback_token_destroy(void * handle_out)1099 void totempg_callback_token_destroy (
1100 	void *handle_out)
1101 {
1102 	if (totempg_threaded_mode == 1) {
1103 		pthread_mutex_lock (&callback_token_mutex);
1104 	}
1105 	totemmrp_callback_token_destroy (handle_out);
1106 	if (totempg_threaded_mode == 1) {
1107 		pthread_mutex_unlock (&callback_token_mutex);
1108 	}
1109 }
1110 
1111 /*
1112  *	vi: set autoindent tabstop=4 shiftwidth=4 :
1113  */
1114 
totempg_groups_initialize(void ** totempg_groups_instance,void (* deliver_fn)(unsigned int nodeid,const void * msg,unsigned int msg_len,int endian_conversion_required),void (* confchg_fn)(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id))1115 int totempg_groups_initialize (
1116 	void **totempg_groups_instance,
1117 
1118 	void (*deliver_fn) (
1119 		unsigned int nodeid,
1120 		const void *msg,
1121 		unsigned int msg_len,
1122 		int endian_conversion_required),
1123 
1124 	void (*confchg_fn) (
1125 		enum totem_configuration_type configuration_type,
1126 		const unsigned int *member_list, size_t member_list_entries,
1127 		const unsigned int *left_list, size_t left_list_entries,
1128 		const unsigned int *joined_list, size_t joined_list_entries,
1129 		const struct memb_ring_id *ring_id))
1130 {
1131 	struct totempg_group_instance *instance;
1132 
1133 	if (totempg_threaded_mode == 1) {
1134 		pthread_mutex_lock (&totempg_mutex);
1135 	}
1136 
1137 	instance = malloc (sizeof (struct totempg_group_instance));
1138 	if (instance == NULL) {
1139 		goto error_exit;
1140 	}
1141 
1142 	instance->deliver_fn = deliver_fn;
1143 	instance->confchg_fn = confchg_fn;
1144 	instance->groups = 0;
1145 	instance->groups_cnt = 0;
1146 	instance->q_level = QB_LOOP_MED;
1147 	list_init (&instance->list);
1148 	list_add (&instance->list, &totempg_groups_list);
1149 
1150 	if (totempg_threaded_mode == 1) {
1151 		pthread_mutex_unlock (&totempg_mutex);
1152 	}
1153 	*totempg_groups_instance = instance;
1154 	return (0);
1155 
1156 error_exit:
1157 	if (totempg_threaded_mode == 1) {
1158 		pthread_mutex_unlock (&totempg_mutex);
1159 	}
1160 	return (-1);
1161 }
1162 
totempg_groups_join(void * totempg_groups_instance,const struct totempg_group * groups,size_t group_cnt)1163 int totempg_groups_join (
1164 	void *totempg_groups_instance,
1165 	const struct totempg_group *groups,
1166 	size_t group_cnt)
1167 {
1168 	struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1169 	struct totempg_group *new_groups;
1170 	unsigned int res = 0;
1171 
1172 	if (totempg_threaded_mode == 1) {
1173 		pthread_mutex_lock (&totempg_mutex);
1174 	}
1175 
1176 	new_groups = realloc (instance->groups,
1177 		sizeof (struct totempg_group) *
1178 		(instance->groups_cnt + group_cnt));
1179 	if (new_groups == 0) {
1180 		res = ENOMEM;
1181 		goto error_exit;
1182 	}
1183 	memcpy (&new_groups[instance->groups_cnt],
1184 		groups, group_cnt * sizeof (struct totempg_group));
1185 	instance->groups = new_groups;
1186 	instance->groups_cnt += group_cnt;
1187 
1188 error_exit:
1189 	if (totempg_threaded_mode == 1) {
1190 		pthread_mutex_unlock (&totempg_mutex);
1191 	}
1192 	return (res);
1193 }
1194 
totempg_groups_leave(void * totempg_groups_instance,const struct totempg_group * groups,size_t group_cnt)1195 int totempg_groups_leave (
1196 	void *totempg_groups_instance,
1197 	const struct totempg_group *groups,
1198 	size_t group_cnt)
1199 {
1200 	if (totempg_threaded_mode == 1) {
1201 		pthread_mutex_lock (&totempg_mutex);
1202 	}
1203 
1204 	if (totempg_threaded_mode == 1) {
1205 		pthread_mutex_unlock (&totempg_mutex);
1206 	}
1207 	return (0);
1208 }
1209 
1210 #define MAX_IOVECS_FROM_APP 32
1211 #define MAX_GROUPS_PER_MSG 32
1212 
totempg_groups_mcast_joined(void * totempg_groups_instance,const struct iovec * iovec,unsigned int iov_len,int guarantee)1213 int totempg_groups_mcast_joined (
1214 	void *totempg_groups_instance,
1215 	const struct iovec *iovec,
1216 	unsigned int iov_len,
1217 	int guarantee)
1218 {
1219 	struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1220 	unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1221 	struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1222 	int i;
1223 	unsigned int res;
1224 
1225 	if (totempg_threaded_mode == 1) {
1226 		pthread_mutex_lock (&totempg_mutex);
1227 	}
1228 
1229 	/*
1230 	 * Build group_len structure and the iovec_mcast structure
1231 	 */
1232 	group_len[0] = instance->groups_cnt;
1233 	for (i = 0; i < instance->groups_cnt; i++) {
1234 		group_len[i + 1] = instance->groups[i].group_len;
1235 		iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1236 		iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1237 	}
1238 	iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1239 	iovec_mcast[0].iov_base = group_len;
1240 	for (i = 0; i < iov_len; i++) {
1241 		iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1242 		iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1243 	}
1244 
1245 	res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1246 
1247 	if (totempg_threaded_mode == 1) {
1248 		pthread_mutex_unlock (&totempg_mutex);
1249 	}
1250 
1251 	return (res);
1252 }
1253 
check_q_level(void * totempg_groups_instance)1254 static void check_q_level(
1255 	void *totempg_groups_instance)
1256 {
1257 	struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1258 	int32_t old_level = instance->q_level;
1259 	int32_t percent_used = q_level_precent_used();
1260 
1261 	if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1262 		instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1263 	} else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1264 		instance->q_level = TOTEM_Q_LEVEL_LOW;
1265 	} else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1266 		instance->q_level = TOTEM_Q_LEVEL_GOOD;
1267 	} else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1268 		instance->q_level = TOTEM_Q_LEVEL_HIGH;
1269 	}
1270 	if (totem_queue_level_changed && old_level != instance->q_level) {
1271 		totem_queue_level_changed(instance->q_level);
1272 	}
1273 }
1274 
totempg_check_q_level(void * totempg_groups_instance)1275 void totempg_check_q_level(
1276 	void *totempg_groups_instance)
1277 {
1278 	struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1279 
1280 	check_q_level(instance);
1281 }
1282 
totempg_groups_joined_reserve(void * totempg_groups_instance,const struct iovec * iovec,unsigned int iov_len)1283 int totempg_groups_joined_reserve (
1284 	void *totempg_groups_instance,
1285 	const struct iovec *iovec,
1286 	unsigned int iov_len)
1287 {
1288 	struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1289 	unsigned int size = 0;
1290 	unsigned int i;
1291 	unsigned int reserved = 0;
1292 
1293 	if (totempg_threaded_mode == 1) {
1294 		pthread_mutex_lock (&totempg_mutex);
1295 		pthread_mutex_lock (&mcast_msg_mutex);
1296 	}
1297 
1298 	for (i = 0; i < instance->groups_cnt; i++) {
1299 		size += instance->groups[i].group_len;
1300 	}
1301 	for (i = 0; i < iov_len; i++) {
1302 		size += iovec[i].iov_len;
1303 	}
1304 
1305 	if (size >= totempg_size_limit) {
1306 		reserved = -1;
1307 		goto error_exit;
1308 	}
1309 
1310 	if (byte_count_send_ok (size)) {
1311 		reserved = send_reserve (size);
1312 	} else {
1313 		reserved = 0;
1314 	}
1315 
1316 error_exit:
1317 	check_q_level(instance);
1318 
1319 	if (totempg_threaded_mode == 1) {
1320 		pthread_mutex_unlock (&mcast_msg_mutex);
1321 		pthread_mutex_unlock (&totempg_mutex);
1322 	}
1323 	return (reserved);
1324 }
1325 
1326 
totempg_groups_joined_release(int msg_count)1327 int totempg_groups_joined_release (int msg_count)
1328 {
1329 	if (totempg_threaded_mode == 1) {
1330 		pthread_mutex_lock (&totempg_mutex);
1331 		pthread_mutex_lock (&mcast_msg_mutex);
1332 	}
1333 	send_release (msg_count);
1334 	if (totempg_threaded_mode == 1) {
1335 		pthread_mutex_unlock (&mcast_msg_mutex);
1336 		pthread_mutex_unlock (&totempg_mutex);
1337 	}
1338 	return 0;
1339 }
1340 
totempg_groups_mcast_groups(void * totempg_groups_instance,int guarantee,const struct totempg_group * groups,size_t groups_cnt,const struct iovec * iovec,unsigned int iov_len)1341 int totempg_groups_mcast_groups (
1342 	void *totempg_groups_instance,
1343 	int guarantee,
1344 	const struct totempg_group *groups,
1345 	size_t groups_cnt,
1346 	const struct iovec *iovec,
1347 	unsigned int iov_len)
1348 {
1349 	unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1350 	struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1351 	int i;
1352 	unsigned int res;
1353 
1354 	if (totempg_threaded_mode == 1) {
1355 		pthread_mutex_lock (&totempg_mutex);
1356 	}
1357 
1358 	/*
1359 	 * Build group_len structure and the iovec_mcast structure
1360 	 */
1361 	group_len[0] = groups_cnt;
1362 	for (i = 0; i < groups_cnt; i++) {
1363 		group_len[i + 1] = groups[i].group_len;
1364 		iovec_mcast[i + 1].iov_len = groups[i].group_len;
1365 		iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1366 	}
1367 	iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1368 	iovec_mcast[0].iov_base = group_len;
1369 	for (i = 0; i < iov_len; i++) {
1370 		iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1371 		iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1372 	}
1373 
1374 	res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1375 
1376 	if (totempg_threaded_mode == 1) {
1377 		pthread_mutex_unlock (&totempg_mutex);
1378 	}
1379 	return (res);
1380 }
1381 
1382 /*
1383  * Returns -1 if error, 0 if can't send, 1 if can send the message
1384  */
totempg_groups_send_ok_groups(void * totempg_groups_instance,const struct totempg_group * groups,size_t groups_cnt,const struct iovec * iovec,unsigned int iov_len)1385 int totempg_groups_send_ok_groups (
1386 	void *totempg_groups_instance,
1387 	const struct totempg_group *groups,
1388 	size_t groups_cnt,
1389 	const struct iovec *iovec,
1390 	unsigned int iov_len)
1391 {
1392 	unsigned int size = 0;
1393 	unsigned int i;
1394 	unsigned int res;
1395 
1396 	if (totempg_threaded_mode == 1) {
1397 		pthread_mutex_lock (&totempg_mutex);
1398 	}
1399 
1400 	for (i = 0; i < groups_cnt; i++) {
1401 		size += groups[i].group_len;
1402 	}
1403 	for (i = 0; i < iov_len; i++) {
1404 		size += iovec[i].iov_len;
1405 	}
1406 
1407 	res = msg_count_send_ok (size);
1408 
1409 	if (totempg_threaded_mode == 1) {
1410 		pthread_mutex_unlock (&totempg_mutex);
1411 	}
1412 	return (res);
1413 }
1414 
totempg_ifaces_get(unsigned int nodeid,struct totem_ip_address * interfaces,unsigned int interfaces_size,char *** status,unsigned int * iface_count)1415 int totempg_ifaces_get (
1416 	unsigned int nodeid,
1417 	struct totem_ip_address *interfaces,
1418 	unsigned int interfaces_size,
1419 	char ***status,
1420 	unsigned int *iface_count)
1421 {
1422 	int res;
1423 
1424 	res = totemmrp_ifaces_get (
1425 		nodeid,
1426 		interfaces,
1427 		interfaces_size,
1428 		status,
1429 		iface_count);
1430 
1431 	return (res);
1432 }
1433 
totempg_event_signal(enum totem_event_type type,int value)1434 void totempg_event_signal (enum totem_event_type type, int value)
1435 {
1436 	totemmrp_event_signal (type, value);
1437 }
1438 
totempg_get_stats(void)1439 void* totempg_get_stats (void)
1440 {
1441 	return &totempg_stats;
1442 }
1443 
totempg_crypto_set(const char * cipher_type,const char * hash_type)1444 int totempg_crypto_set (
1445 	const char *cipher_type,
1446 	const char *hash_type)
1447 {
1448 	int res;
1449 
1450 	res = totemmrp_crypto_set (cipher_type, hash_type);
1451 
1452 	return (res);
1453 }
1454 
totempg_ring_reenable(void)1455 int totempg_ring_reenable (void)
1456 {
1457 	int res;
1458 
1459 	res = totemmrp_ring_reenable ();
1460 
1461 	return (res);
1462 }
1463 
1464 #define ONE_IFACE_LEN 63
totempg_ifaces_print(unsigned int nodeid)1465 const char *totempg_ifaces_print (unsigned int nodeid)
1466 {
1467 	static char iface_string[256 * INTERFACE_MAX];
1468 	char one_iface[ONE_IFACE_LEN+1];
1469 	struct totem_ip_address interfaces[INTERFACE_MAX];
1470 	char **status;
1471 	unsigned int iface_count;
1472 	unsigned int i;
1473 	int res;
1474 
1475 	iface_string[0] = '\0';
1476 
1477 	res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1478 	if (res == -1) {
1479 		return ("no interface found for nodeid");
1480 	}
1481 
1482 	res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1483 
1484 	for (i = 0; i < iface_count; i++) {
1485 		snprintf (one_iface, ONE_IFACE_LEN,
1486 			  "r(%d) ip(%s) ",
1487 			  i, totemip_print (&interfaces[i]));
1488 		strcat (iface_string, one_iface);
1489 	}
1490 	return (iface_string);
1491 }
1492 
totempg_my_nodeid_get(void)1493 unsigned int totempg_my_nodeid_get (void)
1494 {
1495 	return (totemmrp_my_nodeid_get());
1496 }
1497 
totempg_my_family_get(void)1498 int totempg_my_family_get (void)
1499 {
1500 	return (totemmrp_my_family_get());
1501 }
totempg_service_ready_register(void (* totem_service_ready)(void))1502 extern void totempg_service_ready_register (
1503 	void (*totem_service_ready) (void))
1504 {
1505 	totemmrp_service_ready_register (totem_service_ready);
1506 }
1507 
totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)1508 void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn)
1509 {
1510 	totem_queue_level_changed = fn;
1511 }
1512 
totempg_member_add(const struct totem_ip_address * member,int ring_no)1513 extern int totempg_member_add (
1514 	const struct totem_ip_address *member,
1515 	int ring_no)
1516 {
1517 	return totemmrp_member_add (member, ring_no);
1518 }
1519 
totempg_member_remove(const struct totem_ip_address * member,int ring_no)1520 extern int totempg_member_remove (
1521 	const struct totem_ip_address *member,
1522 	int ring_no)
1523 {
1524 	return totemmrp_member_remove (member, ring_no);
1525 }
1526 
totempg_threaded_mode_enable(void)1527 void totempg_threaded_mode_enable (void)
1528 {
1529 	totempg_threaded_mode = 1;
1530 	totemmrp_threaded_mode_enable ();
1531 }
1532 
totempg_trans_ack(void)1533 void totempg_trans_ack (void)
1534 {
1535 	totemmrp_trans_ack ();
1536 }
1537 
1538