1 /*
2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
3 * Copyright (c) 2005 OSDL.
4 * Copyright (c) 2006-2012 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Steven Dake (sdake@redhat.com)
9 * Author: Mark Haverkamp (markh@osdl.org)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38 /*
39 * FRAGMENTATION AND PACKING ALGORITHM:
40 *
41 * Assemble the entire message into one buffer
42 * if full fragment
43 * store fragment into lengths list
44 * for each full fragment
45 * multicast fragment
46 * set length and fragment fields of pg mesage
47 * store remaining multicast into head of fragmentation data and set lens field
48 *
49 * If a message exceeds the maximum packet size allowed by the totem
50 * single ring protocol, the protocol could lose forward progress.
51 * Statically calculating the allowed data amount doesn't work because
52 * the amount of data allowed depends on the number of fragments in
53 * each message. In this implementation, the maximum fragment size
54 * is dynamically calculated for each fragment added to the message.
55
56 * It is possible for a message to be two bytes short of the maximum
57 * packet size. This occurs when a message or collection of
58 * messages + the mcast header + the lens are two bytes short of the
59 * end of the packet. Since another len field consumes two bytes, the
60 * len field would consume the rest of the packet without room for data.
61 *
62 * One optimization would be to forgo the final len field and determine
63 * it from the size of the udp datagram. Then this condition would no
64 * longer occur.
65 */
66
67 /*
68 * ASSEMBLY AND UNPACKING ALGORITHM:
69 *
70 * copy incoming packet into assembly data buffer indexed by current
71 * location of end of fragment
72 *
73 * if not fragmented
74 * deliver all messages in assembly data buffer
75 * else
76 * if msg_count > 1 and fragmented
77 * deliver all messages except last message in assembly data buffer
78 * copy last fragmented section to start of assembly data buffer
79 * else
80 * if msg_count = 1 and fragmented
81 * do nothing
82 *
83 */
84
85 #include <config.h>
86
87 #ifdef HAVE_ALLOCA_H
88 #include <alloca.h>
89 #endif
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
94 #include <sys/uio.h>
95 #include <stdio.h>
96 #include <stdlib.h>
97 #include <string.h>
98 #include <assert.h>
99 #include <pthread.h>
100 #include <errno.h>
101 #include <limits.h>
102
103 #include <corosync/swab.h>
104 #include <corosync/list.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
110
111 #include "totemmrp.h"
112 #include "totemsrp.h"
113
114 #define min(a,b) ((a) < (b)) ? a : b
115
116 struct totempg_mcast_header {
117 short version;
118 short type;
119 };
120
121 #if !(defined(__i386__) || defined(__x86_64__))
122 /*
123 * Need align on architectures different then i386 or x86_64
124 */
125 #define TOTEMPG_NEED_ALIGN 1
126 #endif
127
128 /*
129 * totempg_mcast structure
130 *
131 * header: Identify the mcast.
132 * fragmented: Set if this message continues into next message
133 * continuation: Set if this message is a continuation from last message
134 * msg_count Indicates how many packed messages are contained
135 * in the mcast.
136 * Also, the size of each packed message and the messages themselves are
137 * appended to the end of this structure when sent.
138 */
139 struct totempg_mcast {
140 struct totempg_mcast_header header;
141 unsigned char fragmented;
142 unsigned char continuation;
143 unsigned short msg_count;
144 /*
145 * short msg_len[msg_count];
146 */
147 /*
148 * data for messages
149 */
150 };
151
152 /*
153 * Maximum packet size for totem pg messages
154 */
155 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
156 sizeof (struct totempg_mcast))
157
158 /*
159 * Local variables used for packing small messages
160 */
161 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
162
163 static int mcast_packed_msg_count = 0;
164
165 static int totempg_reserved = 1;
166
167 static unsigned int totempg_size_limit;
168
169 static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
170
171 static uint32_t totempg_threaded_mode = 0;
172
173 /*
174 * Function and data used to log messages
175 */
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
183 int level,
184 int subsys,
185 const char *function,
186 const char *file,
187 int line,
188 const char *format, ...) __attribute__((format(printf, 6, 7)));
189
190 struct totem_config *totempg_totem_config;
191
192 static totempg_stats_t totempg_stats;
193
194 enum throw_away_mode {
195 THROW_AWAY_INACTIVE,
196 THROW_AWAY_ACTIVE
197 };
198
199 struct assembly {
200 unsigned int nodeid;
201 unsigned char data[MESSAGE_SIZE_MAX];
202 int index;
203 unsigned char last_frag_num;
204 enum throw_away_mode throw_away_mode;
205 struct list_head list;
206 };
207
208 static void assembly_deref (struct assembly *assembly);
209
210 static int callback_token_received_fn (enum totem_callback_token_type type,
211 const void *data);
212
213 DECLARE_LIST_INIT(assembly_list_inuse);
214
215 /*
216 * Free list is used both for transitional and operational assemblies
217 */
218 DECLARE_LIST_INIT(assembly_list_free);
219
220 DECLARE_LIST_INIT(assembly_list_inuse_trans);
221
222 DECLARE_LIST_INIT(totempg_groups_list);
223
224 /*
225 * Staging buffer for packed messages. Messages are staged in this buffer
226 * before sending. Multiple messages may fit which cuts down on the
227 * number of mcasts sent. If a message doesn't completely fit, then
228 * the mcast header has a fragment bit set that says that there are more
229 * data to follow. fragment_size is an index into the buffer. It indicates
230 * the size of message data and where to place new message data.
231 * fragment_contuation indicates whether the first packed message in
232 * the buffer is a continuation of a previously packed fragment.
233 */
234 static unsigned char *fragmentation_data;
235
236 static int fragment_size = 0;
237
238 static int fragment_continuation = 0;
239
240 static int totempg_waiting_transack = 0;
241
242 struct totempg_group_instance {
243 void (*deliver_fn) (
244 unsigned int nodeid,
245 const void *msg,
246 unsigned int msg_len,
247 int endian_conversion_required);
248
249 void (*confchg_fn) (
250 enum totem_configuration_type configuration_type,
251 const unsigned int *member_list, size_t member_list_entries,
252 const unsigned int *left_list, size_t left_list_entries,
253 const unsigned int *joined_list, size_t joined_list_entries,
254 const struct memb_ring_id *ring_id);
255
256 struct totempg_group *groups;
257
258 int groups_cnt;
259 int32_t q_level;
260
261 struct list_head list;
262 };
263
264 static unsigned char next_fragment = 1;
265
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271
272 #define log_printf(level, format, args...) \
273 do { \
274 totempg_log_printf(level, \
275 totempg_subsys_id, \
276 __FUNCTION__, __FILE__, __LINE__, \
277 format, ##args); \
278 } while (0);
279
280 static int msg_count_send_ok (int msg_count);
281
282 static int byte_count_send_ok (int byte_count);
283
totempg_waiting_trans_ack_cb(int waiting_trans_ack)284 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285 {
286 log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
288 }
289
assembly_ref(unsigned int nodeid)290 static struct assembly *assembly_ref (unsigned int nodeid)
291 {
292 struct assembly *assembly;
293 struct list_head *list;
294 struct list_head *active_assembly_list_inuse;
295
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
298 } else {
299 active_assembly_list_inuse = &assembly_list_inuse;
300 }
301
302 /*
303 * Search inuse list for node id and return assembly buffer if found
304 */
305 for (list = active_assembly_list_inuse->next;
306 list != active_assembly_list_inuse;
307 list = list->next) {
308
309 assembly = list_entry (list, struct assembly, list);
310
311 if (nodeid == assembly->nodeid) {
312 return (assembly);
313 }
314 }
315
316 /*
317 * Nothing found in inuse list get one from free list if available
318 */
319 if (list_empty (&assembly_list_free) == 0) {
320 assembly = list_entry (assembly_list_free.next, struct assembly, list);
321 list_del (&assembly->list);
322 list_add (&assembly->list, active_assembly_list_inuse);
323 assembly->nodeid = nodeid;
324 assembly->index = 0;
325 assembly->last_frag_num = 0;
326 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
327 return (assembly);
328 }
329
330 /*
331 * Nothing available in inuse or free list, so allocate a new one
332 */
333 assembly = malloc (sizeof (struct assembly));
334 /*
335 * TODO handle memory allocation failure here
336 */
337 assert (assembly);
338 assembly->nodeid = nodeid;
339 assembly->data[0] = 0;
340 assembly->index = 0;
341 assembly->last_frag_num = 0;
342 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
343 list_init (&assembly->list);
344 list_add (&assembly->list, active_assembly_list_inuse);
345
346 return (assembly);
347 }
348
assembly_deref(struct assembly * assembly)349 static void assembly_deref (struct assembly *assembly)
350 {
351
352 list_del (&assembly->list);
353 list_add (&assembly->list, &assembly_list_free);
354 }
355
assembly_deref_from_normal_and_trans(int nodeid)356 static void assembly_deref_from_normal_and_trans (int nodeid)
357 {
358 int j;
359 struct list_head *list, *list_next;
360 struct list_head *active_assembly_list_inuse;
361 struct assembly *assembly;
362
363 for (j = 0; j < 2; j++) {
364 if (j == 0) {
365 active_assembly_list_inuse = &assembly_list_inuse;
366 } else {
367 active_assembly_list_inuse = &assembly_list_inuse_trans;
368 }
369
370 for (list = active_assembly_list_inuse->next;
371 list != active_assembly_list_inuse;
372 list = list_next) {
373
374 list_next = list->next;
375 assembly = list_entry (list, struct assembly, list);
376
377 if (nodeid == assembly->nodeid) {
378 list_del (&assembly->list);
379 list_add (&assembly->list, &assembly_list_free);
380 }
381 }
382 }
383
384 }
385
app_confchg_fn(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id)386 static inline void app_confchg_fn (
387 enum totem_configuration_type configuration_type,
388 const unsigned int *member_list, size_t member_list_entries,
389 const unsigned int *left_list, size_t left_list_entries,
390 const unsigned int *joined_list, size_t joined_list_entries,
391 const struct memb_ring_id *ring_id)
392 {
393 int i;
394 struct totempg_group_instance *instance;
395 struct list_head *list;
396
397 /*
398 * For every leaving processor, add to free list
399 * This also has the side effect of clearing out the dataset
400 * In the leaving processor's assembly buffer.
401 */
402 for (i = 0; i < left_list_entries; i++) {
403 assembly_deref_from_normal_and_trans (left_list[i]);
404 }
405
406 for (list = totempg_groups_list.next;
407 list != &totempg_groups_list;
408 list = list->next) {
409
410 instance = list_entry (list, struct totempg_group_instance, list);
411
412 if (instance->confchg_fn) {
413 instance->confchg_fn (
414 configuration_type,
415 member_list,
416 member_list_entries,
417 left_list,
418 left_list_entries,
419 joined_list,
420 joined_list_entries,
421 ring_id);
422 }
423 }
424 }
425
group_endian_convert(void * msg,int msg_len)426 static inline void group_endian_convert (
427 void *msg,
428 int msg_len)
429 {
430 unsigned short *group_len;
431 int i;
432 char *aligned_msg;
433
434 #ifdef TOTEMPG_NEED_ALIGN
435 /*
436 * Align data structure for not i386 or x86_64
437 */
438 if ((size_t)msg % 4 != 0) {
439 aligned_msg = alloca(msg_len);
440 memcpy(aligned_msg, msg, msg_len);
441 } else {
442 aligned_msg = msg;
443 }
444 #else
445 aligned_msg = msg;
446 #endif
447
448 group_len = (unsigned short *)aligned_msg;
449 group_len[0] = swab16(group_len[0]);
450 for (i = 1; i < group_len[0] + 1; i++) {
451 group_len[i] = swab16(group_len[i]);
452 }
453
454 if (aligned_msg != msg) {
455 memcpy(msg, aligned_msg, msg_len);
456 }
457 }
458
group_matches(struct iovec * iovec,unsigned int iov_len,struct totempg_group * groups_b,unsigned int group_b_cnt,unsigned int * adjust_iovec)459 static inline int group_matches (
460 struct iovec *iovec,
461 unsigned int iov_len,
462 struct totempg_group *groups_b,
463 unsigned int group_b_cnt,
464 unsigned int *adjust_iovec)
465 {
466 unsigned short *group_len;
467 char *group_name;
468 int i;
469 int j;
470 #ifdef TOTEMPG_NEED_ALIGN
471 struct iovec iovec_aligned = { NULL, 0 };
472 #endif
473
474 assert (iov_len == 1);
475
476 #ifdef TOTEMPG_NEED_ALIGN
477 /*
478 * Align data structure for not i386 or x86_64
479 */
480 if ((size_t)iovec->iov_base % 4 != 0) {
481 iovec_aligned.iov_base = alloca(iovec->iov_len);
482 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
483 iovec_aligned.iov_len = iovec->iov_len;
484 iovec = &iovec_aligned;
485 }
486 #endif
487
488 group_len = (unsigned short *)iovec->iov_base;
489 group_name = ((char *)iovec->iov_base) +
490 sizeof (unsigned short) * (group_len[0] + 1);
491
492
493 /*
494 * Calculate amount to adjust the iovec by before delivering to app
495 */
496 *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
497 for (i = 1; i < group_len[0] + 1; i++) {
498 *adjust_iovec += group_len[i];
499 }
500
501 /*
502 * Determine if this message should be delivered to this instance
503 */
504 for (i = 1; i < group_len[0] + 1; i++) {
505 for (j = 0; j < group_b_cnt; j++) {
506 if ((group_len[i] == groups_b[j].group_len) &&
507 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
508 return (1);
509 }
510 }
511 group_name += group_len[i];
512 }
513 return (0);
514 }
515
516
app_deliver_fn(unsigned int nodeid,void * msg,unsigned int msg_len,int endian_conversion_required)517 static inline void app_deliver_fn (
518 unsigned int nodeid,
519 void *msg,
520 unsigned int msg_len,
521 int endian_conversion_required)
522 {
523 struct totempg_group_instance *instance;
524 struct iovec stripped_iovec;
525 unsigned int adjust_iovec;
526 struct iovec *iovec;
527 struct list_head *list;
528
529 struct iovec aligned_iovec = { NULL, 0 };
530
531 if (endian_conversion_required) {
532 group_endian_convert (msg, msg_len);
533 }
534
535 /*
536 * TODO: segmentation/assembly need to be redesigned to provide aligned access
537 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
538 * compatibility
539 */
540
541 #ifdef TOTEMPG_NEED_ALIGN
542 /*
543 * Align data structure for not i386 or x86_64
544 */
545 aligned_iovec.iov_base = alloca(msg_len);
546 aligned_iovec.iov_len = msg_len;
547 memcpy(aligned_iovec.iov_base, msg, msg_len);
548 #else
549 aligned_iovec.iov_base = msg;
550 aligned_iovec.iov_len = msg_len;
551 #endif
552
553 iovec = &aligned_iovec;
554
555 for (list = totempg_groups_list.next;
556 list != &totempg_groups_list;
557 list = list->next) {
558
559 instance = list_entry (list, struct totempg_group_instance, list);
560 if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
561 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
562 stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
563
564 #ifdef TOTEMPG_NEED_ALIGN
565 /*
566 * Align data structure for not i386 or x86_64
567 */
568 if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
569 /*
570 * Deal with misalignment
571 */
572 stripped_iovec.iov_base =
573 alloca (stripped_iovec.iov_len);
574 memcpy (stripped_iovec.iov_base,
575 (char *)iovec->iov_base + adjust_iovec,
576 stripped_iovec.iov_len);
577 }
578 #endif
579 instance->deliver_fn (
580 nodeid,
581 stripped_iovec.iov_base,
582 stripped_iovec.iov_len,
583 endian_conversion_required);
584 }
585 }
586 }
587
totempg_confchg_fn(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id)588 static void totempg_confchg_fn (
589 enum totem_configuration_type configuration_type,
590 const unsigned int *member_list, size_t member_list_entries,
591 const unsigned int *left_list, size_t left_list_entries,
592 const unsigned int *joined_list, size_t joined_list_entries,
593 const struct memb_ring_id *ring_id)
594 {
595 // TODO optimize this
596 app_confchg_fn (configuration_type,
597 member_list, member_list_entries,
598 left_list, left_list_entries,
599 joined_list, joined_list_entries,
600 ring_id);
601 }
602
totempg_deliver_fn(unsigned int nodeid,const void * msg,unsigned int msg_len,int endian_conversion_required)603 static void totempg_deliver_fn (
604 unsigned int nodeid,
605 const void *msg,
606 unsigned int msg_len,
607 int endian_conversion_required)
608 {
609 struct totempg_mcast *mcast;
610 unsigned short *msg_lens;
611 int i;
612 struct assembly *assembly;
613 char header[FRAME_SIZE_MAX];
614 int msg_count;
615 int continuation;
616 int start;
617 const char *data;
618 int datasize;
619 struct iovec iov_delv;
620
621 assembly = assembly_ref (nodeid);
622 assert (assembly);
623
624 /*
625 * Assemble the header into one block of data and
626 * assemble the packet contents into one block of data to simplify delivery
627 */
628
629 mcast = (struct totempg_mcast *)msg;
630 if (endian_conversion_required) {
631 mcast->msg_count = swab16 (mcast->msg_count);
632 }
633
634 msg_count = mcast->msg_count;
635 datasize = sizeof (struct totempg_mcast) +
636 msg_count * sizeof (unsigned short);
637
638 memcpy (header, msg, datasize);
639 data = msg;
640
641 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
642 if (endian_conversion_required) {
643 for (i = 0; i < mcast->msg_count; i++) {
644 msg_lens[i] = swab16 (msg_lens[i]);
645 }
646 }
647
648 memcpy (&assembly->data[assembly->index], &data[datasize],
649 msg_len - datasize);
650
651 /*
652 * If the last message in the buffer is a fragment, then we
653 * can't deliver it. We'll first deliver the full messages
654 * then adjust the assembly buffer so we can add the rest of the
655 * fragment when it arrives.
656 */
657 msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
658 continuation = mcast->continuation;
659 iov_delv.iov_base = (void *)&assembly->data[0];
660 iov_delv.iov_len = assembly->index + msg_lens[0];
661
662 /*
663 * Make sure that if this message is a continuation, that it
664 * matches the sequence number of the previous fragment.
665 * Also, if the first packed message is a continuation
666 * of a previous message, but the assembly buffer
667 * is empty, then we need to discard it since we can't
668 * assemble a complete message. Likewise, if this message isn't a
669 * continuation and the assembly buffer is empty, we have to discard
670 * the continued message.
671 */
672 start = 0;
673
674 if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
675 /* Throw away the first msg block */
676 if (mcast->fragmented == 0 || mcast->fragmented == 1) {
677 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
678
679 assembly->index += msg_lens[0];
680 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
681 iov_delv.iov_len = msg_lens[1];
682 start = 1;
683 }
684 } else
685 if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
686 if (continuation == assembly->last_frag_num) {
687 assembly->last_frag_num = mcast->fragmented;
688 for (i = start; i < msg_count; i++) {
689 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
690 endian_conversion_required);
691 assembly->index += msg_lens[i];
692 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
693 if (i < (msg_count - 1)) {
694 iov_delv.iov_len = msg_lens[i + 1];
695 }
696 }
697 } else {
698 log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
699 continuation, assembly->last_frag_num);
700 assembly->throw_away_mode = THROW_AWAY_ACTIVE;
701 }
702 }
703
704 if (mcast->fragmented == 0) {
705 /*
706 * End of messages, dereference assembly struct
707 */
708 assembly->last_frag_num = 0;
709 assembly->index = 0;
710 assembly_deref (assembly);
711 } else {
712 /*
713 * Message is fragmented, keep around assembly list
714 */
715 if (mcast->msg_count > 1) {
716 memmove (&assembly->data[0],
717 &assembly->data[assembly->index],
718 msg_lens[msg_count]);
719
720 assembly->index = 0;
721 }
722 assembly->index += msg_lens[msg_count];
723 }
724 }
725
726 /*
727 * Totem Process Group Abstraction
728 * depends on poll abstraction, POSIX, IPV4
729 */
730
731 void *callback_token_received_handle;
732
callback_token_received_fn(enum totem_callback_token_type type,const void * data)733 int callback_token_received_fn (enum totem_callback_token_type type,
734 const void *data)
735 {
736 struct totempg_mcast mcast;
737 struct iovec iovecs[3];
738
739 if (totempg_threaded_mode == 1) {
740 pthread_mutex_lock (&mcast_msg_mutex);
741 }
742 if (mcast_packed_msg_count == 0) {
743 if (totempg_threaded_mode == 1) {
744 pthread_mutex_unlock (&mcast_msg_mutex);
745 }
746 return (0);
747 }
748 if (totemmrp_avail() == 0) {
749 if (totempg_threaded_mode == 1) {
750 pthread_mutex_unlock (&mcast_msg_mutex);
751 }
752 return (0);
753 }
754 mcast.header.version = 0;
755 mcast.header.type = 0;
756 mcast.fragmented = 0;
757
758 /*
759 * Was the first message in this buffer a continuation of a
760 * fragmented message?
761 */
762 mcast.continuation = fragment_continuation;
763 fragment_continuation = 0;
764
765 mcast.msg_count = mcast_packed_msg_count;
766
767 iovecs[0].iov_base = (void *)&mcast;
768 iovecs[0].iov_len = sizeof (struct totempg_mcast);
769 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
770 iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
771 iovecs[2].iov_base = (void *)&fragmentation_data[0];
772 iovecs[2].iov_len = fragment_size;
773 (void)totemmrp_mcast (iovecs, 3, 0);
774
775 mcast_packed_msg_count = 0;
776 fragment_size = 0;
777
778 if (totempg_threaded_mode == 1) {
779 pthread_mutex_unlock (&mcast_msg_mutex);
780 }
781 return (0);
782 }
783
784 /*
785 * Initialize the totem process group abstraction
786 */
totempg_initialize(qb_loop_t * poll_handle,struct totem_config * totem_config)787 int totempg_initialize (
788 qb_loop_t *poll_handle,
789 struct totem_config *totem_config)
790 {
791 int res;
792
793 totempg_totem_config = totem_config;
794 totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
795 totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
796 totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
797 totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
798 totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
799 totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
800 totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
801
802 fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
803 if (fragmentation_data == 0) {
804 return (-1);
805 }
806
807 totemsrp_net_mtu_adjust (totem_config);
808
809 res = totemmrp_initialize (
810 poll_handle,
811 totem_config,
812 &totempg_stats,
813 totempg_deliver_fn,
814 totempg_confchg_fn,
815 totempg_waiting_trans_ack_cb);
816
817 if (res == -1) {
818 goto error_exit;
819 }
820
821 totemmrp_callback_token_create (
822 &callback_token_received_handle,
823 TOTEM_CALLBACK_TOKEN_RECEIVED,
824 0,
825 callback_token_received_fn,
826 0);
827
828 totempg_size_limit = (totemmrp_avail() - 1) *
829 (totempg_totem_config->net_mtu -
830 sizeof (struct totempg_mcast) - 16);
831
832 list_init (&totempg_groups_list);
833
834 error_exit:
835 return (res);
836 }
837
totempg_finalize(void)838 void totempg_finalize (void)
839 {
840 if (totempg_threaded_mode == 1) {
841 pthread_mutex_lock (&totempg_mutex);
842 }
843 totemmrp_finalize ();
844 if (totempg_threaded_mode == 1) {
845 pthread_mutex_unlock (&totempg_mutex);
846 }
847 }
848
849 /*
850 * Multicast a message
851 */
mcast_msg(struct iovec * iovec_in,unsigned int iov_len,int guarantee)852 static int mcast_msg (
853 struct iovec *iovec_in,
854 unsigned int iov_len,
855 int guarantee)
856 {
857 int res = 0;
858 struct totempg_mcast mcast;
859 struct iovec iovecs[3];
860 struct iovec iovec[64];
861 int i;
862 int dest, src;
863 int max_packet_size = 0;
864 int copy_len = 0;
865 int copy_base = 0;
866 int total_size = 0;
867
868 if (totempg_threaded_mode == 1) {
869 pthread_mutex_lock (&mcast_msg_mutex);
870 }
871 totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1);
872
873 /*
874 * Remove zero length iovectors from the list
875 */
876 assert (iov_len < 64);
877 for (dest = 0, src = 0; src < iov_len; src++) {
878 if (iovec_in[src].iov_len) {
879 memcpy (&iovec[dest++], &iovec_in[src],
880 sizeof (struct iovec));
881 }
882 }
883 iov_len = dest;
884
885 max_packet_size = TOTEMPG_PACKET_SIZE -
886 (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
887
888 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
889
890 /*
891 * Check if we would overwrite new message queue
892 */
893 for (i = 0; i < iov_len; i++) {
894 total_size += iovec[i].iov_len;
895 }
896
897 if (byte_count_send_ok (total_size + sizeof(unsigned short) *
898 (mcast_packed_msg_count)) == 0) {
899
900 if (totempg_threaded_mode == 1) {
901 pthread_mutex_unlock (&mcast_msg_mutex);
902 }
903 return(-1);
904 }
905
906 mcast.header.version = 0;
907 for (i = 0; i < iov_len; ) {
908 mcast.fragmented = 0;
909 mcast.continuation = fragment_continuation;
910 copy_len = iovec[i].iov_len - copy_base;
911
912 /*
913 * If it all fits with room left over, copy it in.
914 * We need to leave at least sizeof(short) + 1 bytes in the
915 * fragment_buffer on exit so that max_packet_size + fragment_size
916 * doesn't exceed the size of the fragment_buffer on the next call.
917 */
918 if ((iovec[i].iov_len + fragment_size) <
919 (max_packet_size - sizeof (unsigned short))) {
920
921 memcpy (&fragmentation_data[fragment_size],
922 (char *)iovec[i].iov_base + copy_base, copy_len);
923 fragment_size += copy_len;
924 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
925 next_fragment = 1;
926 copy_len = 0;
927 copy_base = 0;
928 i++;
929 continue;
930
931 /*
932 * If it just fits or is too big, then send out what fits.
933 */
934 } else {
935 unsigned char *data_ptr;
936
937 copy_len = min(copy_len, max_packet_size - fragment_size);
938 if( copy_len == max_packet_size )
939 data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
940 else {
941 data_ptr = fragmentation_data;
942 }
943
944 memcpy (&fragmentation_data[fragment_size],
945 (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
946 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
947
948 /*
949 * if we're not on the last iovec or the iovec is too large to
950 * fit, then indicate a fragment. This also means that the next
951 * message will have the continuation of this one.
952 */
953 if ((i < (iov_len - 1)) ||
954 ((copy_base + copy_len) < iovec[i].iov_len)) {
955 if (!next_fragment) {
956 next_fragment++;
957 }
958 fragment_continuation = next_fragment;
959 mcast.fragmented = next_fragment++;
960 assert(fragment_continuation != 0);
961 assert(mcast.fragmented != 0);
962 } else {
963 fragment_continuation = 0;
964 }
965
966 /*
967 * assemble the message and send it
968 */
969 mcast.msg_count = ++mcast_packed_msg_count;
970 iovecs[0].iov_base = (void *)&mcast;
971 iovecs[0].iov_len = sizeof(struct totempg_mcast);
972 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
973 iovecs[1].iov_len = mcast_packed_msg_count *
974 sizeof(unsigned short);
975 iovecs[2].iov_base = (void *)data_ptr;
976 iovecs[2].iov_len = fragment_size + copy_len;
977 assert (totemmrp_avail() > 0);
978 res = totemmrp_mcast (iovecs, 3, guarantee);
979 if (res == -1) {
980 goto error_exit;
981 }
982
983 /*
984 * Recalculate counts and indexes for the next.
985 */
986 mcast_packed_msg_lens[0] = 0;
987 mcast_packed_msg_count = 0;
988 fragment_size = 0;
989 max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
990
991 /*
992 * If the iovec all fit, go to the next iovec
993 */
994 if ((copy_base + copy_len) == iovec[i].iov_len) {
995 copy_len = 0;
996 copy_base = 0;
997 i++;
998
999 /*
1000 * Continue with the rest of the current iovec.
1001 */
1002 } else {
1003 copy_base += copy_len;
1004 }
1005 }
1006 }
1007
1008 /*
1009 * Bump only if we added message data. This may be zero if
1010 * the last buffer just fit into the fragmentation_data buffer
1011 * and we were at the last iovec.
1012 */
1013 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1014 mcast_packed_msg_count++;
1015 }
1016
1017 error_exit:
1018 if (totempg_threaded_mode == 1) {
1019 pthread_mutex_unlock (&mcast_msg_mutex);
1020 }
1021 return (res);
1022 }
1023
1024 /*
1025 * Determine if a message of msg_size could be queued
1026 */
msg_count_send_ok(int msg_count)1027 static int msg_count_send_ok (
1028 int msg_count)
1029 {
1030 int avail = 0;
1031
1032 avail = totemmrp_avail ();
1033 totempg_stats.msg_queue_avail = avail;
1034
1035 return ((avail - totempg_reserved) > msg_count);
1036 }
1037
byte_count_send_ok(int byte_count)1038 static int byte_count_send_ok (
1039 int byte_count)
1040 {
1041 unsigned int msg_count = 0;
1042 int avail = 0;
1043
1044 avail = totemmrp_avail ();
1045
1046 msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1047
1048 return (avail >= msg_count);
1049 }
1050
send_reserve(int msg_size)1051 static int send_reserve (
1052 int msg_size)
1053 {
1054 unsigned int msg_count = 0;
1055
1056 msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1057 totempg_reserved += msg_count;
1058 totempg_stats.msg_reserved = totempg_reserved;
1059
1060 return (msg_count);
1061 }
1062
send_release(int msg_count)1063 static void send_release (
1064 int msg_count)
1065 {
1066 totempg_reserved -= msg_count;
1067 totempg_stats.msg_reserved = totempg_reserved;
1068 }
1069
1070 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1071 #undef MESSAGE_QUEUE_MAX
1072 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1073 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1074
q_level_precent_used(void)1075 static uint32_t q_level_precent_used(void)
1076 {
1077 return (100 - (((totemmrp_avail() - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1078 }
1079
totempg_callback_token_create(void ** handle_out,enum totem_callback_token_type type,int delete,int (* callback_fn)(enum totem_callback_token_type type,const void *),const void * data)1080 int totempg_callback_token_create (
1081 void **handle_out,
1082 enum totem_callback_token_type type,
1083 int delete,
1084 int (*callback_fn) (enum totem_callback_token_type type, const void *),
1085 const void *data)
1086 {
1087 unsigned int res;
1088 if (totempg_threaded_mode == 1) {
1089 pthread_mutex_lock (&callback_token_mutex);
1090 }
1091 res = totemmrp_callback_token_create (handle_out, type, delete,
1092 callback_fn, data);
1093 if (totempg_threaded_mode == 1) {
1094 pthread_mutex_unlock (&callback_token_mutex);
1095 }
1096 return (res);
1097 }
1098
totempg_callback_token_destroy(void * handle_out)1099 void totempg_callback_token_destroy (
1100 void *handle_out)
1101 {
1102 if (totempg_threaded_mode == 1) {
1103 pthread_mutex_lock (&callback_token_mutex);
1104 }
1105 totemmrp_callback_token_destroy (handle_out);
1106 if (totempg_threaded_mode == 1) {
1107 pthread_mutex_unlock (&callback_token_mutex);
1108 }
1109 }
1110
1111 /*
1112 * vi: set autoindent tabstop=4 shiftwidth=4 :
1113 */
1114
totempg_groups_initialize(void ** totempg_groups_instance,void (* deliver_fn)(unsigned int nodeid,const void * msg,unsigned int msg_len,int endian_conversion_required),void (* confchg_fn)(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id))1115 int totempg_groups_initialize (
1116 void **totempg_groups_instance,
1117
1118 void (*deliver_fn) (
1119 unsigned int nodeid,
1120 const void *msg,
1121 unsigned int msg_len,
1122 int endian_conversion_required),
1123
1124 void (*confchg_fn) (
1125 enum totem_configuration_type configuration_type,
1126 const unsigned int *member_list, size_t member_list_entries,
1127 const unsigned int *left_list, size_t left_list_entries,
1128 const unsigned int *joined_list, size_t joined_list_entries,
1129 const struct memb_ring_id *ring_id))
1130 {
1131 struct totempg_group_instance *instance;
1132
1133 if (totempg_threaded_mode == 1) {
1134 pthread_mutex_lock (&totempg_mutex);
1135 }
1136
1137 instance = malloc (sizeof (struct totempg_group_instance));
1138 if (instance == NULL) {
1139 goto error_exit;
1140 }
1141
1142 instance->deliver_fn = deliver_fn;
1143 instance->confchg_fn = confchg_fn;
1144 instance->groups = 0;
1145 instance->groups_cnt = 0;
1146 instance->q_level = QB_LOOP_MED;
1147 list_init (&instance->list);
1148 list_add (&instance->list, &totempg_groups_list);
1149
1150 if (totempg_threaded_mode == 1) {
1151 pthread_mutex_unlock (&totempg_mutex);
1152 }
1153 *totempg_groups_instance = instance;
1154 return (0);
1155
1156 error_exit:
1157 if (totempg_threaded_mode == 1) {
1158 pthread_mutex_unlock (&totempg_mutex);
1159 }
1160 return (-1);
1161 }
1162
totempg_groups_join(void * totempg_groups_instance,const struct totempg_group * groups,size_t group_cnt)1163 int totempg_groups_join (
1164 void *totempg_groups_instance,
1165 const struct totempg_group *groups,
1166 size_t group_cnt)
1167 {
1168 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1169 struct totempg_group *new_groups;
1170 unsigned int res = 0;
1171
1172 if (totempg_threaded_mode == 1) {
1173 pthread_mutex_lock (&totempg_mutex);
1174 }
1175
1176 new_groups = realloc (instance->groups,
1177 sizeof (struct totempg_group) *
1178 (instance->groups_cnt + group_cnt));
1179 if (new_groups == 0) {
1180 res = ENOMEM;
1181 goto error_exit;
1182 }
1183 memcpy (&new_groups[instance->groups_cnt],
1184 groups, group_cnt * sizeof (struct totempg_group));
1185 instance->groups = new_groups;
1186 instance->groups_cnt += group_cnt;
1187
1188 error_exit:
1189 if (totempg_threaded_mode == 1) {
1190 pthread_mutex_unlock (&totempg_mutex);
1191 }
1192 return (res);
1193 }
1194
totempg_groups_leave(void * totempg_groups_instance,const struct totempg_group * groups,size_t group_cnt)1195 int totempg_groups_leave (
1196 void *totempg_groups_instance,
1197 const struct totempg_group *groups,
1198 size_t group_cnt)
1199 {
1200 if (totempg_threaded_mode == 1) {
1201 pthread_mutex_lock (&totempg_mutex);
1202 }
1203
1204 if (totempg_threaded_mode == 1) {
1205 pthread_mutex_unlock (&totempg_mutex);
1206 }
1207 return (0);
1208 }
1209
1210 #define MAX_IOVECS_FROM_APP 32
1211 #define MAX_GROUPS_PER_MSG 32
1212
totempg_groups_mcast_joined(void * totempg_groups_instance,const struct iovec * iovec,unsigned int iov_len,int guarantee)1213 int totempg_groups_mcast_joined (
1214 void *totempg_groups_instance,
1215 const struct iovec *iovec,
1216 unsigned int iov_len,
1217 int guarantee)
1218 {
1219 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1220 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1221 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1222 int i;
1223 unsigned int res;
1224
1225 if (totempg_threaded_mode == 1) {
1226 pthread_mutex_lock (&totempg_mutex);
1227 }
1228
1229 /*
1230 * Build group_len structure and the iovec_mcast structure
1231 */
1232 group_len[0] = instance->groups_cnt;
1233 for (i = 0; i < instance->groups_cnt; i++) {
1234 group_len[i + 1] = instance->groups[i].group_len;
1235 iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1236 iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1237 }
1238 iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1239 iovec_mcast[0].iov_base = group_len;
1240 for (i = 0; i < iov_len; i++) {
1241 iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1242 iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1243 }
1244
1245 res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1246
1247 if (totempg_threaded_mode == 1) {
1248 pthread_mutex_unlock (&totempg_mutex);
1249 }
1250
1251 return (res);
1252 }
1253
check_q_level(void * totempg_groups_instance)1254 static void check_q_level(
1255 void *totempg_groups_instance)
1256 {
1257 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1258 int32_t old_level = instance->q_level;
1259 int32_t percent_used = q_level_precent_used();
1260
1261 if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1262 instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1263 } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1264 instance->q_level = TOTEM_Q_LEVEL_LOW;
1265 } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1266 instance->q_level = TOTEM_Q_LEVEL_GOOD;
1267 } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1268 instance->q_level = TOTEM_Q_LEVEL_HIGH;
1269 }
1270 if (totem_queue_level_changed && old_level != instance->q_level) {
1271 totem_queue_level_changed(instance->q_level);
1272 }
1273 }
1274
totempg_check_q_level(void * totempg_groups_instance)1275 void totempg_check_q_level(
1276 void *totempg_groups_instance)
1277 {
1278 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1279
1280 check_q_level(instance);
1281 }
1282
totempg_groups_joined_reserve(void * totempg_groups_instance,const struct iovec * iovec,unsigned int iov_len)1283 int totempg_groups_joined_reserve (
1284 void *totempg_groups_instance,
1285 const struct iovec *iovec,
1286 unsigned int iov_len)
1287 {
1288 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1289 unsigned int size = 0;
1290 unsigned int i;
1291 unsigned int reserved = 0;
1292
1293 if (totempg_threaded_mode == 1) {
1294 pthread_mutex_lock (&totempg_mutex);
1295 pthread_mutex_lock (&mcast_msg_mutex);
1296 }
1297
1298 for (i = 0; i < instance->groups_cnt; i++) {
1299 size += instance->groups[i].group_len;
1300 }
1301 for (i = 0; i < iov_len; i++) {
1302 size += iovec[i].iov_len;
1303 }
1304
1305 if (size >= totempg_size_limit) {
1306 reserved = -1;
1307 goto error_exit;
1308 }
1309
1310 if (byte_count_send_ok (size)) {
1311 reserved = send_reserve (size);
1312 } else {
1313 reserved = 0;
1314 }
1315
1316 error_exit:
1317 check_q_level(instance);
1318
1319 if (totempg_threaded_mode == 1) {
1320 pthread_mutex_unlock (&mcast_msg_mutex);
1321 pthread_mutex_unlock (&totempg_mutex);
1322 }
1323 return (reserved);
1324 }
1325
1326
totempg_groups_joined_release(int msg_count)1327 int totempg_groups_joined_release (int msg_count)
1328 {
1329 if (totempg_threaded_mode == 1) {
1330 pthread_mutex_lock (&totempg_mutex);
1331 pthread_mutex_lock (&mcast_msg_mutex);
1332 }
1333 send_release (msg_count);
1334 if (totempg_threaded_mode == 1) {
1335 pthread_mutex_unlock (&mcast_msg_mutex);
1336 pthread_mutex_unlock (&totempg_mutex);
1337 }
1338 return 0;
1339 }
1340
totempg_groups_mcast_groups(void * totempg_groups_instance,int guarantee,const struct totempg_group * groups,size_t groups_cnt,const struct iovec * iovec,unsigned int iov_len)1341 int totempg_groups_mcast_groups (
1342 void *totempg_groups_instance,
1343 int guarantee,
1344 const struct totempg_group *groups,
1345 size_t groups_cnt,
1346 const struct iovec *iovec,
1347 unsigned int iov_len)
1348 {
1349 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1350 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1351 int i;
1352 unsigned int res;
1353
1354 if (totempg_threaded_mode == 1) {
1355 pthread_mutex_lock (&totempg_mutex);
1356 }
1357
1358 /*
1359 * Build group_len structure and the iovec_mcast structure
1360 */
1361 group_len[0] = groups_cnt;
1362 for (i = 0; i < groups_cnt; i++) {
1363 group_len[i + 1] = groups[i].group_len;
1364 iovec_mcast[i + 1].iov_len = groups[i].group_len;
1365 iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1366 }
1367 iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1368 iovec_mcast[0].iov_base = group_len;
1369 for (i = 0; i < iov_len; i++) {
1370 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1371 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1372 }
1373
1374 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1375
1376 if (totempg_threaded_mode == 1) {
1377 pthread_mutex_unlock (&totempg_mutex);
1378 }
1379 return (res);
1380 }
1381
1382 /*
1383 * Returns -1 if error, 0 if can't send, 1 if can send the message
1384 */
totempg_groups_send_ok_groups(void * totempg_groups_instance,const struct totempg_group * groups,size_t groups_cnt,const struct iovec * iovec,unsigned int iov_len)1385 int totempg_groups_send_ok_groups (
1386 void *totempg_groups_instance,
1387 const struct totempg_group *groups,
1388 size_t groups_cnt,
1389 const struct iovec *iovec,
1390 unsigned int iov_len)
1391 {
1392 unsigned int size = 0;
1393 unsigned int i;
1394 unsigned int res;
1395
1396 if (totempg_threaded_mode == 1) {
1397 pthread_mutex_lock (&totempg_mutex);
1398 }
1399
1400 for (i = 0; i < groups_cnt; i++) {
1401 size += groups[i].group_len;
1402 }
1403 for (i = 0; i < iov_len; i++) {
1404 size += iovec[i].iov_len;
1405 }
1406
1407 res = msg_count_send_ok (size);
1408
1409 if (totempg_threaded_mode == 1) {
1410 pthread_mutex_unlock (&totempg_mutex);
1411 }
1412 return (res);
1413 }
1414
totempg_ifaces_get(unsigned int nodeid,struct totem_ip_address * interfaces,unsigned int interfaces_size,char *** status,unsigned int * iface_count)1415 int totempg_ifaces_get (
1416 unsigned int nodeid,
1417 struct totem_ip_address *interfaces,
1418 unsigned int interfaces_size,
1419 char ***status,
1420 unsigned int *iface_count)
1421 {
1422 int res;
1423
1424 res = totemmrp_ifaces_get (
1425 nodeid,
1426 interfaces,
1427 interfaces_size,
1428 status,
1429 iface_count);
1430
1431 return (res);
1432 }
1433
totempg_event_signal(enum totem_event_type type,int value)1434 void totempg_event_signal (enum totem_event_type type, int value)
1435 {
1436 totemmrp_event_signal (type, value);
1437 }
1438
totempg_get_stats(void)1439 void* totempg_get_stats (void)
1440 {
1441 return &totempg_stats;
1442 }
1443
totempg_crypto_set(const char * cipher_type,const char * hash_type)1444 int totempg_crypto_set (
1445 const char *cipher_type,
1446 const char *hash_type)
1447 {
1448 int res;
1449
1450 res = totemmrp_crypto_set (cipher_type, hash_type);
1451
1452 return (res);
1453 }
1454
totempg_ring_reenable(void)1455 int totempg_ring_reenable (void)
1456 {
1457 int res;
1458
1459 res = totemmrp_ring_reenable ();
1460
1461 return (res);
1462 }
1463
1464 #define ONE_IFACE_LEN 63
totempg_ifaces_print(unsigned int nodeid)1465 const char *totempg_ifaces_print (unsigned int nodeid)
1466 {
1467 static char iface_string[256 * INTERFACE_MAX];
1468 char one_iface[ONE_IFACE_LEN+1];
1469 struct totem_ip_address interfaces[INTERFACE_MAX];
1470 char **status;
1471 unsigned int iface_count;
1472 unsigned int i;
1473 int res;
1474
1475 iface_string[0] = '\0';
1476
1477 res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1478 if (res == -1) {
1479 return ("no interface found for nodeid");
1480 }
1481
1482 res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1483
1484 for (i = 0; i < iface_count; i++) {
1485 snprintf (one_iface, ONE_IFACE_LEN,
1486 "r(%d) ip(%s) ",
1487 i, totemip_print (&interfaces[i]));
1488 strcat (iface_string, one_iface);
1489 }
1490 return (iface_string);
1491 }
1492
totempg_my_nodeid_get(void)1493 unsigned int totempg_my_nodeid_get (void)
1494 {
1495 return (totemmrp_my_nodeid_get());
1496 }
1497
totempg_my_family_get(void)1498 int totempg_my_family_get (void)
1499 {
1500 return (totemmrp_my_family_get());
1501 }
totempg_service_ready_register(void (* totem_service_ready)(void))1502 extern void totempg_service_ready_register (
1503 void (*totem_service_ready) (void))
1504 {
1505 totemmrp_service_ready_register (totem_service_ready);
1506 }
1507
totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)1508 void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn)
1509 {
1510 totem_queue_level_changed = fn;
1511 }
1512
totempg_member_add(const struct totem_ip_address * member,int ring_no)1513 extern int totempg_member_add (
1514 const struct totem_ip_address *member,
1515 int ring_no)
1516 {
1517 return totemmrp_member_add (member, ring_no);
1518 }
1519
totempg_member_remove(const struct totem_ip_address * member,int ring_no)1520 extern int totempg_member_remove (
1521 const struct totem_ip_address *member,
1522 int ring_no)
1523 {
1524 return totemmrp_member_remove (member, ring_no);
1525 }
1526
totempg_threaded_mode_enable(void)1527 void totempg_threaded_mode_enable (void)
1528 {
1529 totempg_threaded_mode = 1;
1530 totemmrp_threaded_mode_enable ();
1531 }
1532
totempg_trans_ack(void)1533 void totempg_trans_ack (void)
1534 {
1535 totemmrp_trans_ack ();
1536 }
1537
1538