1 /*
2 * The Spread Toolkit.
3 *
4 * The contents of this file are subject to the Spread Open-Source
5 * License, Version 1.0 (the ``License''); you may not use
6 * this file except in compliance with the License. You may obtain a
7 * copy of the License at:
8 *
9 * http://www.spread.org/license/
10 *
11 * or in the file ``license.txt'' found in this distribution.
12 *
13 * Software distributed under the License is distributed on an AS IS basis,
14 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15 * for the specific language governing rights and limitations under the
16 * License.
17 *
18 * The Creators of Spread are:
19 * Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20 *
21 * Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22 *
23 * All Rights Reserved.
24 *
25 * Major Contributor(s):
26 * ---------------
27 * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28 * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
29 * Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
30 * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
31 *
32 */
33
34
35 #include "arch.h"
36 #include <string.h>
37 #include <stdio.h>
38 #include <assert.h>
39
40 #include "spread_params.h"
41 #include "net_types.h"
42 #include "protocol.h"
43 #include "prot_body.h"
44 #include "sess_types.h"
45 #include "sess_body.h"
46 #include "groups.h"
47 #include "objects.h"
48 #include "memory.h"
49 #include "sp_events.h"
50 #include "status.h"
51 #include "alarm.h"
52 #include "membership.h"
53 #if ( SPREAD_PROTOCOL > 3 )
54 #include "queues.h"
55 #endif
56 #include "skiplist.h"
57 #include "alarm.h"
58 #include "message.h"
59
60 #ifndef NULL
61 #define NULL 0
62 #endif /* NULL */
63
64 #define ESTABLISHED_MEMBER 0
65 #define NEW_MEMBER 1
66 #define PARTITIONED_MEMBER 2
67 #define Is_established_member( status ) ( status == ESTABLISHED_MEMBER )
68 #define Is_new_member( status ) ( status == NEW_MEMBER )
69 #define Is_partitioned_member( status ) ( status == PARTITIONED_MEMBER )
70
71 typedef struct dummy_groups_buf_link {
72 char buf[GROUPS_BUF_SIZE];
73 int bytes;
74 struct dummy_groups_buf_link *next;
75 } groups_buf_link;
76
77 struct worklist {
78 char name[MAX_GROUP_NAME];
79 Skiplist *groups;
80 };
81
printgroup(void * vgrp)82 char *printgroup(void *vgrp) {
83 group *grp = (group *)vgrp;
84 return grp->name;
85 }
86
87 static int Gstate;
88 static configuration Trans_memb;
89 static membership_id Trans_memb_id;
90 static configuration Reg_memb;
91 static membership_id Reg_memb_id;
92 static char Mess_buf[MAX_MESSAGE_BODY_LEN];
93
94 static groups_buf_link *Groups_bufs;
95 static int Num_mess_gathered;
96 static int Num_daemons_gathered;
97 static message_link Gathered; /* Groups messages */
98
99 #if 0
100 static group Work[MAX_PROCS_RING+1];
101 #endif
102 static int Groups_control_down_queue;
103
104 static int G_id_is_equal( group_id g1, group_id g2 );
105 static void G_print_group_id( group_id g );
106 static group *G_get_group( char *group_name );
107 static member *G_get_member( group *grp, char *private_group_name );
108 static int G_build_memb_buf( group *grp, message_obj *msg,
109 char buf[] );
110 static int G_build_memb_vs_buf( group *grp, message_obj *msg,
111 char buf[], int32 caused );
112 static message_link *G_build_trans_mess( group *grp );
113
114 static void G_stamp_groups_buf( char buf[] );
115 static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes );
116 static int G_build_groups_buf( char buf[], struct skiplistnode **iter_ptr );
117 static int G_mess_to_groups( message_link *mess_link, char *name,
118 struct worklist *work );
119 static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] );
120 static void G_compute_and_notify(void);
121 static void G_print(void);
122 static void G_empty_groups_bufs(void);
123
124 static Skiplist GroupsList;
125
126 static int G_compare(void *, void *);
127
G_compare(void * a,void * b)128 static int G_compare(void *a, void *b)
129 {
130 /* Takes two Group records and compares them based on their keys (name) */
131 /* This will work for any record type that has a char[LENGTH] as the first
132 member of the struct and is the key */
133 assert(a);
134 assert(b);
135 return strcmp((char *)a, (char *)b);
136 }
137
G_member_recordcompare(void * a,void * b)138 int G_member_recordcompare(void *a, void *b)
139 {
140 int compared;
141 /* Takes two Member records and compares them based on their keys (name) */
142 member *am = (member *)a, *bm = (member *)b;
143 assert(a);
144 assert(b);
145 compared = strcmp( bm->private_name, am->private_name );
146 if(compared > 0)
147 return -1;
148 if(compared == 0)
149 return 0;
150 return 1;
151 }
G_member_keycompare(void * a,void * b)152 int G_member_keycompare(void *a, void *b)
153 {
154 int compared;
155 /* Takes two Member records and compares them based on their keys (name) */
156 member *bm = (member *)b;
157 char *am = (char *)a;
158 assert(a);
159 assert(b);
160 compared = strcmp( bm->private_name, am );
161 if(compared > 0)
162 return -1;
163 if(compared == 0) {
164 return 0;
165 }
166 return 1;
167 }
168 /* Take two worklist pointers and compare them by the ->groups pointers */
169 /* We do not actually care about the order they are stored in the skiplist
170 * that is why we are using meaningless pointers as keys.
171 */
G_work_groups_comp(void * a,void * b)172 static int G_work_groups_comp(void *a, void *b)
173 {
174 struct worklist *wA, *wB;
175 assert(a);
176 assert(b);
177 wA = (struct worklist *)a;
178 wB = (struct worklist *)b;
179
180 if (wA->groups < wB->groups)
181 return(-1);
182 if (wA->groups == wB->groups)
183 return(0);
184 return(1);
185 }
G_work_groups_keycomp(void * key,void * b)186 static int G_work_groups_keycomp(void *key, void *b)
187 {
188 struct worklist *wB;
189 Skiplist *wKey;
190 assert(key);
191 assert(b);
192 wB = (struct worklist *)b;
193 wKey = (Skiplist *)key;
194 if (wKey < wB->groups)
195 return(-1);
196 if (wKey == wB->groups)
197 return(0);
198 return(1);
199 }
200 #if 0
201 int G_group_revproc_comp(void *a, void *b) {
202 int32 aid, bid;
203 struct worklist *A=(struct worklist *)a, *B=(struct worklist *)b;
204 aid=A->proc_index;
205 bid=B->proc_index;
206 return (aid>bid)?(-1):((aid==bid)?0:1);
207 }
208 int G_group_revproc_keycomp(void *a, void *b) {
209 int32 aid, bid;
210 struct worklist *B=(struct worklist *)b;
211 bid=B->proc_index;
212 aid = *(int32 *)a;
213 return (aid>bid)?-1:((aid==bid)?0:1);
214 }
215 #endif
G_init()216 void G_init()
217 {
218 int ret;
219
220 Alarm( GROUPS, "G_init: \n" );
221
222 Num_groups = 0;
223 GlobalStatus.num_groups = Num_groups;
224
225 sl_init(&GroupsList);
226 /* Key's address is the same as records address and is a null
227 terminated string, so we can use the same function to compare
228 record<->record
229 key<->record */
230 sl_set_compare(&GroupsList,
231 G_compare,
232 G_compare);
233
234 /* Groups.next = NULL;
235 Groups.prev = NULL; */
236
237 ret = Mem_init_object(GROUP, sizeof(group), 100, 0);
238 if (ret < 0)
239 {
240 Alarm(EXIT, "G_init: Failure to Initialize GROUP memory objects\n");
241 }
242
243 ret = Mem_init_object(MEMBER, sizeof(member), 200, 0);
244 if (ret < 0)
245 {
246 Alarm(EXIT, "G_init: Failure to Initialize MEMBER memory objects\n");
247 }
248 ret = Mem_init_object(GROUPS_BUF_LINK, sizeof(groups_buf_link), 1, 1);
249 if( ret < 0 )
250 {
251 Alarm(EXIT, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n");
252 }
253 #if ( SPREAD_PROTOCOL == 3 )
254 Groups_control_down_queue = 0;
255 #else
256 Groups_control_down_queue = init_queuesess(Groups_down_qs);
257 #endif
258
259 Groups_bufs = NULL;
260 Num_mess_gathered = 0;
261 Num_daemons_gathered = 0;
262 Gathered.next = NULL;
263
264 Gstate = GOP;
265 GlobalStatus.gstate = Gstate;
266 }
267
G_handle_reg_memb(configuration reg_memb,membership_id reg_memb_id)268 void G_handle_reg_memb( configuration reg_memb, membership_id reg_memb_id )
269 {
270 group *grp, *nextgroup;
271 member *mbr, *nextmember;
272 struct skiplistnode *giter, *iter, *passed_iter;
273 groups_buf_link *grps_buf_link;
274 message_link *mess_link;
275 down_link *down_ptr;
276 message_obj *msg;
277 message_header *head_ptr;
278 int num_bytes;
279 int needed;
280 int ses;
281 int i;
282
283
284 Alarm( GROUPS, "G_handle_reg_memb: with (%d.%d.%d.%d, %d) id\n",
285 IP1(reg_memb_id.proc_id),IP2(reg_memb_id.proc_id),IP3(reg_memb_id.proc_id),
286 IP4(reg_memb_id.proc_id), reg_memb_id.time );
287
288 switch( Gstate )
289 {
290 case GOP:
291 Alarm( EXIT, "G_handle_reg_memb in GOP\n");
292
293 break;
294
295 case GTRANS:
296 /*
297 * Save reg_memb and reg_memb_id
298 * if previous Trans_memb is equal to reg_memb then:
299 * for every changed group
300 * eliminate partitioned members
301 * set Grp_id to (reg_memb_id, 1)
302 * notify local members of regular membership
303 * Shift to GOP
304 * else
305 * for every changed group
306 * eliminate partitioned members
307 * set Grp_id to (reg_memb_id, -1)
308 * Replace protocol queue, raise event thershold
309 * Build groups message -- only local members
310 * Send groups message
311 * Shift to GGATHER
312 */
313 Alarm( GROUPS, "G_handle_reg_memb in GTRANS\n");
314
315 Reg_memb = reg_memb;
316 Reg_memb_id = reg_memb_id;
317
318 if( Conf_num_procs( &Trans_memb ) == Conf_num_procs( &Reg_memb ) )
319 {
320 giter = sl_getlist( &GroupsList );
321 grp = (giter)?(group *)giter->data:NULL;
322 for( ; grp != NULL ; grp = nextgroup )
323 {
324 nextgroup = sl_next( &GroupsList, &giter );
325 if( grp->changed )
326 {
327 /* The group has changed */
328 /* eliminating partitioned members */
329 iter = sl_getlist( &grp->MembersList );
330 mbr = (iter)?(member *)iter->data:NULL;
331 for( ; mbr != NULL ; mbr = nextmember )
332 {
333 nextmember = sl_next( &grp->MembersList, &iter );
334 if( Is_partitioned_member( mbr->status ) )
335 {
336 /* discard this member - proc no longer in membership */
337 sl_remove ( &grp->MembersList,mbr->private_name, dispose);
338 grp->num_members--;
339 }
340 }
341 if( grp->num_members == 0 )
342 {
343 /* discard this empty group */
344 sl_destruct ( &grp->MembersList, dispose);
345 sl_remove ( &GroupsList, grp->name, dispose);
346 Num_groups--;
347 GlobalStatus.num_groups = Num_groups;
348 }else{
349 Alarm( GROUPS, "G_handle_reg_memb: skipping state transfer for group %s.\n", grp->name );
350 Alarm( DEBUG, "G_handle_reg_memb: changing group_id from: " );
351 G_print_group_id( grp->grp_id );
352 grp->grp_id.memb_id = Reg_memb_id;
353 grp->grp_id.index = 1;
354 grp->changed = 0;
355 Alarm( DEBUG, " to: " );
356 G_print_group_id( grp->grp_id );
357 Alarm( DEBUG, "\n" );
358
359 if( grp->num_local > 0 ){
360 /* send members regular membership */
361 msg = Message_new_message();
362 num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
363
364 /* create the mess_link */
365 mess_link = new( MESSAGE_LINK );
366 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
367 mess_link->mess = msg;
368 Obj_Inc_Refcount(mess_link->mess);
369
370 /* notify local members */
371 needed = 0;
372 for( i=0; i < grp->num_local; i++ )
373 {
374 ses = Sess_get_session_index ( grp->mbox[i] );
375 if( Is_memb_session( Sessions[ ses ].status ) )
376 Sess_write( ses, mess_link, &needed );
377 }
378 Message_Dec_Refcount(msg);
379 if( !needed ) Sess_dispose_message( mess_link );
380 }
381 }
382 }
383 }
384 Gstate = GOP;
385 GlobalStatus.gstate = Gstate;
386 }else{
387 /*
388 * else
389 * for every changed group
390 * eliminate partitioned members
391 * set Grp_id to (reg_memb_id, -1)
392 * Replace protocol queue, raise event thershold
393 * build groups message -- only local members
394 * Send groups message
395 * Shift to GGATHER
396 */
397 giter = sl_getlist( &GroupsList );
398 grp = (giter)?(group *)giter->data:NULL;
399 for( ; grp != NULL ; grp = nextgroup )
400 {
401 nextgroup = sl_next( &GroupsList, &giter );
402 if( grp->changed )
403 {
404 /* The group has changed */
405 /* eliminating partitioned members */
406 iter = sl_getlist( &grp->MembersList );
407 mbr = (iter)?(member *)iter->data:NULL;
408 for( ; mbr != NULL ; mbr = nextmember )
409 {
410 nextmember = sl_next( &grp->MembersList, &iter );
411 if( Is_partitioned_member( mbr->status ) )
412 {
413 /* discard this member - proc no longer in membership */
414 sl_remove ( &grp->MembersList,mbr->private_name, dispose);
415 grp->num_members--;
416 }
417 }
418 if( grp->num_members == 0 )
419 {
420 /* discard this empty group */
421 sl_destruct ( &grp->MembersList, dispose);
422 sl_remove ( &GroupsList, grp->name, dispose);
423 Num_groups--;
424 GlobalStatus.num_groups = Num_groups;
425 }
426 }
427 }
428 /* raise events threshold */
429 Session_threshold = MEDIUM_PRIORITY;
430 Sess_set_active_threshold();
431
432 /* Replace down queue */
433 Prot_set_down_queue( GROUPS_DOWNQUEUE );
434
435 /* build and send Groups message */
436 /* Nowadays, we can send multiple groups messages. No group has
437 * data in more than one. As an optimization, only the last message is
438 * AGREED, and all previous ones are RELIABLE. G_handle_groups uses this
439 * knowledge when parsing Groups messages. */
440 passed_iter = NULL;
441 do {
442 msg = Message_new_message();
443 grps_buf_link = new( GROUPS_BUF_LINK );
444 grps_buf_link->next = Groups_bufs;
445 Groups_bufs = grps_buf_link;
446 grps_buf_link->bytes = G_build_groups_buf(grps_buf_link->buf, &passed_iter);
447 G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
448 head_ptr = Message_get_message_header(msg);
449 if( passed_iter )
450 head_ptr->type |= RELIABLE_MESS;
451 else
452 head_ptr->type |= AGREED_MESS;
453 Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
454
455 down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
456 down_ptr->mess = msg;
457 Obj_Inc_Refcount(down_ptr->mess);
458 /* Use control queue--not normal session queues */
459 Prot_new_message( down_ptr, Groups_control_down_queue );
460 Message_Dec_Refcount(msg);
461 Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GTRANS with %d bytes\n",
462 (passed_iter) ? "RELIABLE" : "AGREED", grps_buf_link->bytes );
463 } while( passed_iter != NULL );
464
465 Gstate = GGATHER;
466 GlobalStatus.gstate = Gstate;
467 }
468 break;
469
470 case GGATHER:
471 Alarm( EXIT, "G_handle_reg_memb in GGATHER\n");
472
473 break;
474
475 case GGT:
476 /*
477 * Save reg_memb and reg_memb_id
478 * Clear all retained Groups messages
479 * Stamp own Groups message with current membership id
480 * Send group message
481 * Shift to GGATHER
482 */
483 Alarm( GROUPS, "G_handle_reg_memb in GGT\n");
484
485 Reg_memb = reg_memb;
486 Reg_memb_id = reg_memb_id;
487
488 /* Clear retained Groups messages in Gathered */
489 for( i=0; i < Num_mess_gathered; i++ )
490 {
491 mess_link = Gathered.next;
492 Gathered.next = mess_link->next;
493 Sess_dispose_message( mess_link );
494 }
495 Num_mess_gathered = 0;
496 Num_daemons_gathered = 0;
497
498 for( grps_buf_link = Groups_bufs; grps_buf_link; grps_buf_link = grps_buf_link->next )
499 {
500 /* Stamp own Groups message in buffer with current membership id */
501 G_stamp_groups_buf( grps_buf_link->buf );
502
503 /* send Groups message */
504 msg = Message_new_message();
505 G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
506 head_ptr = Message_get_message_header(msg);
507 if( grps_buf_link->next )
508 head_ptr->type |= RELIABLE_MESS;
509 else
510 head_ptr->type |= AGREED_MESS;
511 Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
512
513 down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
514 down_ptr->mess = msg;
515 Obj_Inc_Refcount(down_ptr->mess);
516 /* Use control queue--not normal session queues */
517 Prot_new_message( down_ptr, Groups_control_down_queue );
518 Message_Dec_Refcount(msg);
519 Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GGT with %d bytes\n",
520 (grps_buf_link->next) ? "RELIABLE" : "AGREED", grps_buf_link->bytes );
521 }
522
523 Gstate = GGATHER;
524 GlobalStatus.gstate = Gstate;
525
526 break;
527 }
528 }
529
G_handle_trans_memb(configuration trans_memb,membership_id trans_memb_id)530 void G_handle_trans_memb( configuration trans_memb, membership_id trans_memb_id )
531 {
532 group *grp, *nextgroup;
533 member *mbr, *nextmember;
534 struct skiplistnode *giter, *iter;
535 int group_changed;
536 message_link *mess_link;
537 int needed;
538 int ses;
539 int i;
540
541 Alarm( GROUPS, "G_handle_trans_memb: \n" );
542
543 switch( Gstate )
544 {
545 case GOP:
546 /*
547 * Save transitional membership
548 * For every group that has members that are not in the trans_memb do:
549 * mark group members that are not in trans_memb as partitioned.
550 * notify local members with an empty transitional group mess.
551 * mark group as changed (index = -1)
552 * Shift to GTRANS
553 */
554 Alarm( GROUPS, "G_handle_trans_memb in GOP\n");
555
556 Trans_memb = trans_memb;
557 Trans_memb_id = trans_memb_id;
558 Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:"
559 " {proc_id: %d "
560 " time: %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time );
561
562 giter = sl_getlist( &GroupsList );
563 grp = (giter)?(group *)giter->data:NULL;
564 for( ; grp != NULL ; grp = nextgroup )
565 {
566 nextgroup = sl_next( &GroupsList, &giter );
567 group_changed = 0;
568 iter = sl_getlist( &grp->MembersList );
569 mbr = (iter)?(member *)iter->data:NULL;
570 for( ; mbr != NULL ; mbr = nextmember )
571 {
572 nextmember = sl_next( &grp->MembersList, &iter );
573 if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 )
574 {
575 /* mark this member as partitioned - proc no longer in membership */
576 mbr->status = PARTITIONED_MEMBER;
577 group_changed = 1;
578 }
579 }
580 if( group_changed )
581 {
582 if( grp->num_local > 0 )
583 {
584 /* send members transitional membership */
585 mess_link = G_build_trans_mess( grp );
586 needed = 0;
587 for( i=0; i < grp->num_local; i++ )
588 {
589 ses = Sess_get_session_index ( grp->mbox[i] );
590 if( Is_memb_session( Sessions[ ses ].status ) )
591 Sess_write( ses, mess_link, &needed );
592 }
593 if( !needed ) Sess_dispose_message( mess_link );
594 }
595 Alarm( DEBUG, "G_handle_trans_memb: changed group %s in GOP, change"
596 " group id from: ", grp->name );
597 G_print_group_id( grp->grp_id );
598 grp->grp_id.memb_id = trans_memb_id;
599 grp->grp_id.index = 1; /* Not technically needed, but not bad, either. */
600 grp->changed = 1;
601 Alarm( DEBUG, " to: " );
602 G_print_group_id( grp->grp_id );
603 Alarm( DEBUG, "\n" );
604 }
605 }
606
607 Gstate = GTRANS;
608 GlobalStatus.gstate = Gstate;
609
610 break;
611
612 case GTRANS:
613 Alarm( EXIT, "G_handle_trans_memb in GTRANS\n");
614
615 break;
616
617 case GGATHER:
618 /*
619 * Save transitional membership
620 * For every group that has members that are not in the
621 * trans_memb do:
622 * discard group members that are not in trans_memb
623 * if group is changed then mark it as changed (index = -1) (it might be already changed, but its ok).
624 * Shift to GGT
625 *
626 * Note: there is no need to notify local members with a transitional group mess
627 * becuase no message will come between the trans group memb and the next reg group memb.
628 * Note: this cascading deletes of members that are not in transitional membership actually
629 * opens the door for implementation of the ERSADS97 algorithm.
630 */
631 Alarm( GROUPS, "G_handle_trans_memb in GGATHER\n");
632
633 Trans_memb = trans_memb;
634 Trans_memb_id = trans_memb_id; /* Need this because we deliver the transitional again if we complete
635 * the state exchange during GGT. */
636 Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:"
637 " {proc_id: %d "
638 " time: %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time );
639
640 giter = sl_getlist( &GroupsList );
641 grp = (giter)?(group *)giter->data:NULL;
642 for( ; grp != NULL ; grp = nextgroup )
643 {
644 nextgroup = sl_next( &GroupsList, &giter );
645 group_changed = 0;
646 iter = sl_getlist( &grp->MembersList );
647 mbr = (iter)?(member *)iter->data:NULL;
648 for( ; mbr != NULL ; mbr = nextmember )
649 {
650 nextmember = sl_next( &grp->MembersList, &iter );
651 if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 )
652 {
653 /* discard this member - proc no longer in membership */
654 sl_remove ( &grp->MembersList,mbr->private_name, dispose);
655 grp->num_members--;
656
657 group_changed = 1;
658 }
659 }
660 if( grp->num_members == 0 )
661 {
662 /* discard this empty group */
663 sl_destruct ( &grp->MembersList, dispose);
664 sl_remove ( &GroupsList, grp->name, dispose);
665 Num_groups--;
666 GlobalStatus.num_groups = Num_groups;
667
668 }else if( group_changed ) {
669 grp->changed = 1;
670 }
671 }
672
673 Gstate = GGT;
674 GlobalStatus.gstate = Gstate;
675
676 break;
677
678 case GGT:
679 Alarm( EXIT, "G_handle_trans_memb in GGT\n");
680
681 break;
682 }
683 }
684
G_handle_join(char * private_group_name,char * group_name)685 void G_handle_join( char *private_group_name, char *group_name )
686 {
687 group *grp, *new_grp;
688 member *mbr, *new_mbr;
689 int needed;
690 char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
691 int num_bytes;
692 char proc_name[MAX_PROC_NAME];
693 char private_name[MAX_PRIVATE_NAME+1];
694 int new_p_ind, p_ind1;
695 proc new_p, p1;
696 int ses;
697 mailbox new_mbox;
698 message_link *mess_link;
699 message_header *head_ptr;
700 message_obj *msg, *joiner_msg;
701 char *vs_ptr; /* the virtual synchrony set */
702 int i;
703 int32u temp;
704
705 Alarm( GROUPS, "G_handle_join: %s joins group %s\n", private_group_name, group_name );
706
707 switch( Gstate )
708 {
709 case GOP:
710 case GTRANS:
711
712 if (Gstate == GOP) Alarm( GROUPS, "G_handle_join in GOP\n");
713 if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_join in GTRANS\n");
714
715 /*
716 * if already in group then ignore
717 * if the group is unchanged and new member is coming from alive daemon then:
718 * Insert to group as established
719 * Increment Grp_id
720 * Notify all local members of a regular membership caused by join
721 * else if group is changed and coming from alive daemon then
722 * Insert to group as new
723 * Increment Grp_id
724 * if there are local members then
725 * build a membership with all members, and vs set with all established members
726 * notify all local established members with that membership (caused by network)
727 * if new member is local
728 * notify new member with membership and self vs set (caused by network)
729 * notify all local members a transitional membership
730 * mark new member as established
731 * else (if new member is coming from a partitioned daemon then)
732 * Insert to group as partitioned
733 * Increment Grp_id, and mark group as changed if not already done
734 * if there are local members then
735 * build a membership with all members and vs set with all established members
736 * notify all local members with that membership (caused by network)
737 * notify all local members with transitional membership
738 *
739 * Note: remember that when delivering a regular message while in GTRANS, you should use the
740 * mbox list of the group. You should still be cautious when delivering memberships to take
741 * care of the fact that the new guy gets a different treatment.
742 */
743 G_private_to_names( private_group_name, private_name, proc_name );
744
745 new_p_ind = Conf_proc_by_name( proc_name, &new_p );
746 if( new_p_ind < 0 )
747 {
748 Alarm( PRINT, "G_handle_join: illegal proc_name %s in private_group %s \n",
749 proc_name, private_group_name );
750 return;
751 }
752 grp = G_get_group( group_name );
753 if( grp == NULL )
754 {
755 new_grp = new( GROUP );
756 memset( new_grp->name, 0, MAX_GROUP_NAME );
757 strcpy( new_grp->name, group_name );
758 sl_init( &new_grp->MembersList );
759 sl_set_compare( &new_grp->MembersList,
760 G_member_recordcompare,
761 G_member_keycompare);
762 if( Gstate == GOP) {
763 new_grp->changed = 0;
764 new_grp->grp_id.memb_id = Reg_memb_id;
765 } else { /* Gtrans */
766 new_grp->changed = 1;
767 new_grp->grp_id.memb_id = Trans_memb_id;
768 }
769 new_grp->grp_id.index = 0; /* 0 because we will definitely increment it */
770 Alarm( DEBUG, "G_handle_join: New group added with group id: " );
771 G_print_group_id( new_grp->grp_id );
772 Alarm( DEBUG, "\n" );
773
774 new_grp->num_members = 0;
775 new_grp->num_local = 0;
776
777 sl_insert( &GroupsList, new_grp );
778 Num_groups++; /*sl need this?*/
779 GlobalStatus.num_groups = Num_groups;
780 grp = new_grp;
781 }
782 mbr = G_get_member( grp, private_group_name );
783 if( mbr != NULL )
784 {
785 Alarm( PRINT, "G_handle_join: %s is already in group %s\n",
786 private_group_name, group_name );
787 return;
788 }
789 /* Add a new member as ESTABLISHED (might change later on depending on the situation */
790 new_mbr = new( MEMBER );
791 memset( new_mbr->private_name, 0, MAX_GROUP_NAME );
792 strcpy( new_mbr->private_name, private_group_name );
793 new_mbr->proc_id = new_p.id;
794 new_mbr->status = ESTABLISHED_MEMBER;
795 new_mbr->p_ind = new_p_ind;
796
797 sl_insert( &grp->MembersList, new_mbr );
798 grp->num_members++;
799
800 /* if member is local then add mbox */
801 if( new_mbr->proc_id == My.id )
802 {
803 ses = Sess_get_session( private_name );
804 if( ses < 0 ) Alarm( EXIT, "G_handle_join: local session does not exist\n");
805 grp->mbox[ grp->num_local ] = Sessions[ ses ].mbox;
806 grp->num_local++;
807 new_mbox = Sessions[ ses ].mbox;
808 }else new_mbox = -1;
809
810 /* This is the meat */
811 if( Gstate == GOP || ( Conf_id_in_conf( &Trans_memb, new_p.id ) != -1 ) )
812 {
813 /* new member is coming from alive daemon */
814 if( !grp->changed )
815 {
816 /* group is unchanged */
817 /* Increment group id */
818 grp->grp_id.index++;
819
820 /* Notify local members */
821 if( grp->num_local > 0 )
822 {
823 msg = Message_new_message();
824 num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
825 head_ptr = Message_get_message_header(msg);
826
827 head_ptr->type |= CAUSED_BY_JOIN ;
828
829 /* notify all local members */
830 num_vs_ptr = &Mess_buf[ num_bytes ];
831 num_bytes += sizeof( int32 );
832 temp = 1;
833 memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
834
835 vs_ptr = (char *)&Mess_buf[ num_bytes ];
836 memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME );
837 num_bytes += MAX_GROUP_NAME;
838
839 head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
840
841 mess_link = new( MESSAGE_LINK );
842 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
843 mess_link->mess = msg;
844 Obj_Inc_Refcount(mess_link->mess);
845
846 needed = 0;
847 for( i=0; i < grp->num_local; i++ )
848 {
849 ses = Sess_get_session_index ( grp->mbox[i] );
850 if( Is_memb_session( Sessions[ ses ].status ) )
851 Sess_write( ses, mess_link, &needed );
852 }
853 if ( !needed ) Sess_dispose_message( mess_link );
854 Message_Dec_Refcount(msg);
855 }
856 }else{
857 /* group is changed */
858 /* mark new member as new */
859 new_mbr->status = NEW_MEMBER;
860
861 /* Increment group id */
862 grp->grp_id.index++;
863
864 if( grp->num_local > 0 )
865 {
866 /* build a membership with all members, and vs set with all established members */
867 msg = Message_new_message();
868 num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
869
870 /* notify all non-new local members with that membership (caused by network) */
871 mess_link = new( MESSAGE_LINK );
872 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
873 mess_link->mess = msg;
874 Obj_Inc_Refcount(mess_link->mess);
875
876 needed = 0;
877 for( i=0; i < grp->num_local; i++ )
878 {
879 /* if new member is local we do not notify it here */
880 if( grp->mbox[i] == new_mbox ) continue;
881
882 ses = Sess_get_session_index ( grp->mbox[i] );
883 if( Is_memb_session( Sessions[ ses ].status ) )
884 Sess_write( ses, mess_link, &needed );
885 }
886 if ( !needed ) Sess_dispose_message( mess_link );
887 Message_Dec_Refcount(msg);
888
889 /* notify new member if local */
890 if( new_mbox != -1 )
891 {
892 /* build a membership with all members */
893 joiner_msg = Message_new_message();
894 num_bytes = G_build_memb_buf( grp, joiner_msg, Mess_buf );
895 head_ptr = Message_get_message_header(joiner_msg);
896 head_ptr->type |= CAUSED_BY_NETWORK ;
897 /* build a self vs set */
898 num_vs_ptr = &Mess_buf[ num_bytes ];
899 num_bytes += sizeof( int32 );
900 temp = 1;
901 memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
902 vs_ptr = (char *)&Mess_buf[ num_bytes ];
903 memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME );
904 num_bytes += MAX_GROUP_NAME;
905 head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
906 mess_link = new( MESSAGE_LINK );
907 Message_Buffer_to_Message_Fragments( joiner_msg, Mess_buf, num_bytes );
908 mess_link->mess = joiner_msg;
909 Obj_Inc_Refcount(mess_link->mess);
910
911 needed = 0;
912 ses = Sess_get_session_index ( new_mbox );
913 if( Is_memb_session( Sessions[ ses ].status ) )
914 Sess_write( ses, mess_link, &needed );
915 if ( !needed ) Sess_dispose_message( mess_link );
916 Message_Dec_Refcount(joiner_msg);
917 }
918 /* notify all local members a transitional membership */
919 mess_link = G_build_trans_mess( grp );
920 needed = 0;
921 for( i=0; i < grp->num_local; i++ )
922 {
923 ses = Sess_get_session_index ( grp->mbox[i] );
924 if( Is_memb_session( Sessions[ ses ].status ) )
925 Sess_write( ses, mess_link, &needed );
926 }
927 if( !needed ) Sess_dispose_message( mess_link );
928 }
929 /* Mark new member as established */
930 new_mbr->status = ESTABLISHED_MEMBER;
931 }
932 }else{
933 /* coming from a partitioned daemon */
934 /* mark new member as partitioned member */
935 new_mbr->status = PARTITIONED_MEMBER;
936 /*
937 * (marking group as changed - it might be already )
938 */
939 if( !grp->changed ) grp->changed = 1;
940 grp->grp_id.index++;
941 if( grp->num_local > 0 )
942 {
943 /* build a membership with all members, and vs set with all non-partitioned members */
944 msg = Message_new_message();
945 num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
946
947 /* notify all local members with that membership (caused by network) */
948 mess_link = new( MESSAGE_LINK );
949 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
950 mess_link->mess = msg;
951 Obj_Inc_Refcount(mess_link->mess);
952
953 needed = 0;
954 for( i=0; i < grp->num_local; i++ )
955 {
956 ses = Sess_get_session_index ( grp->mbox[i] );
957 if( Is_memb_session( Sessions[ ses ].status ) )
958 Sess_write( ses, mess_link, &needed );
959 }
960 if ( !needed ) Sess_dispose_message( mess_link );
961 Message_Dec_Refcount(msg);
962
963 /* notify all local members a transitional membership */
964 mess_link = G_build_trans_mess( grp );
965
966 needed = 0;
967 for( i=0; i < grp->num_local; i++ )
968 {
969 ses = Sess_get_session_index ( grp->mbox[i] );
970 if( Is_memb_session( Sessions[ ses ].status ) )
971 Sess_write( ses, mess_link, &needed );
972 }
973 if( !needed ) Sess_dispose_message( mess_link );
974 }
975 }
976 /* Compute the mask */
977 for(i=0; i<4; i++)
978 {
979 grp->grp_mask[i] = 0;
980 }
981 {
982 struct skiplistnode *iter;
983 member *memp;
984 /*
985 for( mbr= &grp->members; mbr->next != NULL; mbr=mbr->next )
986 {
987 p_ind1 = Conf_proc_by_id( mbr->next->proc_id, &p1 ); */
988 for( iter = sl_getlist( &grp->MembersList ),
989 memp=(member *)iter->data;
990 iter != NULL;
991 memp = (member *)sl_next( &grp->MembersList, &iter )) {
992 p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
993 temp = 1;
994 for(i=0; i<p1.seg_index%32; i++)
995 {
996 temp *= 2;
997 }
998 grp->grp_mask[p1.seg_index/32] |= temp;
999 }
1000 }
1001 Alarm(GROUPS, "G_handle_join: Mask for group %s set to %x %x %x %x\n",
1002 grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
1003
1004 break;
1005
1006 case GGATHER:
1007 Alarm( EXIT, "G_handle_join in GGATHER\n");
1008
1009 break;
1010
1011 case GGT:
1012 Alarm( EXIT, "G_handle_join in GGT\n");
1013
1014 break;
1015 }
1016 }
1017
G_handle_leave(char * private_group_name,char * group_name)1018 void G_handle_leave( char *private_group_name, char *group_name )
1019 {
1020
1021 char proc_name[MAX_PROC_NAME];
1022 char private_name[MAX_PRIVATE_NAME+1];
1023 char departing_private_group_name[MAX_GROUP_NAME];
1024 int p_ind, p_ind1;
1025 proc p, p1;
1026 group *grp;
1027 member *mbr;
1028 char *num_vs_ptr; /* num members in vs set */
1029 char *vs_ptr; /* the virtual synchrony set */
1030 message_link *mess_link;
1031 message_header *head_ptr;
1032 message_obj *msg;
1033 int num_bytes;
1034 int needed;
1035 int ses;
1036 int i, j;
1037 int32u temp;
1038
1039 Alarm( GROUPS, "G_handle_leave: %s leaves group %s\n", private_group_name, group_name );
1040
1041 switch( Gstate )
1042 {
1043 case GOP:
1044 case GTRANS:
1045
1046 if (Gstate == GOP) Alarm( GROUPS, "G_handle_leave in GOP\n");
1047 if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_leave in GTRANS\n");
1048
1049 /*
1050 * if not already in group then ignore
1051 * if this member is local, notify it and extract its mbox
1052 * Extract this member from group
1053 * if the group is unchanged (in GOP all groups are unchanged) then:
1054 * Increment Grp_id
1055 * Notify all local members of a regular membership caused by leave
1056 */
1057 G_private_to_names( private_group_name, private_name, proc_name );
1058 p_ind = Conf_proc_by_name( proc_name, &p );
1059 if( p_ind < 0 )
1060 {
1061 Alarm( PRINT, "G_handle_leave: illegal proc_name %s in private_group %s \n",
1062 proc_name, private_group_name );
1063 return;
1064 }
1065 grp = G_get_group( group_name );
1066 if( grp == NULL )
1067 {
1068 Alarm( PRINT, "G_handle_leave: group %s does not exist\n",
1069 group_name );
1070 return;
1071 }
1072 mbr = G_get_member( grp, private_group_name );
1073 if( mbr == NULL )
1074 {
1075 Alarm( PRINT, "G_handle_leave: member %s does not exist in group %s\n",
1076 private_group_name, group_name );
1077 return;
1078 }
1079
1080 if( p.id == My.id )
1081 {
1082 /* notify this local member and extract its mbox from group */
1083 msg = Message_new_message();
1084 head_ptr = Message_get_message_header(msg);
1085 head_ptr->type = CAUSED_BY_LEAVE;
1086 head_ptr->type = Set_endian( head_ptr->type );
1087 head_ptr->hint = Set_endian( 0 );
1088 memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
1089 head_ptr->num_groups = 0;
1090 head_ptr->data_len = 0;
1091
1092 /* create the mess_link */
1093 mess_link = new( MESSAGE_LINK );
1094 /* NOTE: Mess_buf contents are NOT used here. We only examine "0" bytes of it
1095 * We just need a valid pointer here to prevent faults */
1096 Message_Buffer_to_Message_Fragments( msg, Mess_buf, 0);
1097 mess_link->mess = msg;
1098 Obj_Inc_Refcount(mess_link->mess);
1099 /* notify member */
1100 needed = 0;
1101 ses = Sess_get_session( private_name );
1102 if( Is_memb_session( Sessions[ ses ].status ) )
1103 Sess_write( ses, mess_link, &needed );
1104 if( !needed ) Sess_dispose_message( mess_link );
1105
1106 /* extract this mbox */
1107 for( i=0, j=0; i < grp->num_local; i++,j++ )
1108 {
1109 if( grp->mbox[i] == Sessions[ses].mbox ) j--;
1110 else grp->mbox[j] = grp->mbox[i];
1111 }
1112 grp->num_local--;
1113 Message_Dec_Refcount(msg);
1114 }
1115
1116 /* extract this member from group */
1117 memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME );
1118 sl_remove( &grp->MembersList, mbr->private_name, dispose );
1119 grp->num_members--;
1120 if( grp->num_members == 0 )
1121 {
1122 /* discard this empty group */
1123 sl_destruct ( &grp->MembersList, dispose);
1124 sl_remove( &GroupsList, grp->name, dispose );
1125 Num_groups--;
1126 GlobalStatus.num_groups = Num_groups;
1127 return;
1128 }
1129
1130 if( grp->changed )
1131 {
1132 if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_leave: changed group in GOP\n");
1133 /*
1134 * If the group is changed (in GTRANS) then there is no need
1135 * to increment group id or to notify the local members.
1136 * They will get a group membership after the state transfer
1137 * terminates.
1138 */
1139 return;
1140 }
1141
1142 /* Increment group id */
1143 grp->grp_id.index++;
1144
1145 if( grp->num_local > 0 )
1146 {
1147 /* notify all local members */
1148 msg = Message_new_message();
1149 num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
1150 head_ptr = Message_get_message_header(msg);
1151 head_ptr->type |= CAUSED_BY_LEAVE ;
1152
1153 /* notify all local members */
1154 num_vs_ptr = &Mess_buf[ num_bytes ];
1155 num_bytes += sizeof( int32 );
1156 temp = 1;
1157 memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
1158
1159 vs_ptr = (char *)&Mess_buf[ num_bytes ];
1160 memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME );
1161 num_bytes += MAX_GROUP_NAME;
1162
1163 head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
1164
1165 mess_link = new( MESSAGE_LINK );
1166 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
1167 mess_link->mess = msg;
1168 Obj_Inc_Refcount(mess_link->mess);
1169 needed = 0;
1170 for( i=0; i < grp->num_local; i++ )
1171 {
1172 ses = Sess_get_session_index ( grp->mbox[i] );
1173 if( Is_memb_session( Sessions[ ses ].status ) )
1174 Sess_write( ses, mess_link, &needed );
1175 }
1176 if ( !needed ) Sess_dispose_message( mess_link );
1177 Message_Dec_Refcount(msg);
1178 }
1179 /* Compute the mask */
1180 for(i=0; i<4; i++)
1181 {
1182 grp->grp_mask[i] = 0;
1183 }
1184 {
1185 struct skiplistnode *iter;
1186 member *memp;
1187 for( iter = sl_getlist( &grp->MembersList ),
1188 memp=(member *)iter->data;
1189 iter != NULL;
1190 memp = (member *)sl_next( &grp->MembersList, &iter )) {
1191 p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
1192 temp = 1;
1193 for(i=0; i<p1.seg_index%32; i++)
1194 {
1195 temp *= 2;
1196 }
1197 grp->grp_mask[p1.seg_index/32] |= temp;
1198 }
1199 }
1200
1201 Alarm(GROUPS, "G_handle_leave: Mask for group %s set to %x %x %x %x\n",
1202 grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
1203 break;
1204
1205 case GGATHER:
1206 Alarm( EXIT, "G_handle_leave in GGATHER\n");
1207
1208 break;
1209
1210 case GGT:
1211 Alarm( EXIT, "G_handle_leave in GGT\n");
1212
1213 break;
1214 }
1215 }
1216
G_handle_kill(char * private_group_name)1217 void G_handle_kill( char *private_group_name )
1218 {
1219 char proc_name[MAX_PROC_NAME];
1220 char private_name[MAX_PRIVATE_NAME+1];
1221 char departing_private_group_name[MAX_GROUP_NAME];
1222 int p_ind, p_ind1;
1223 proc p, p1;
1224 group *grp, *nextgroup;
1225 member *mbr;
1226 char *num_vs_ptr; /* num members in vs set */
1227 char *vs_ptr; /* the virtual synchrony set */
1228 message_link *mess_link;
1229 message_header *head_ptr;
1230 message_obj *msg;
1231 int num_bytes;
1232 int needed;
1233 int ses = -1; /* Fool compiler */
1234 int i, j;
1235 int32u temp;
1236 struct skiplistnode *giter;
1237
1238 Alarm( GROUPS, "G_handle_kill: %s is killed\n", private_group_name );
1239
1240 switch( Gstate )
1241 {
1242 case GOP:
1243 case GTRANS:
1244
1245 if (Gstate == GOP) Alarm( GROUPS, "G_handle_kill in GOP\n");
1246 if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_kill in GTRANS\n");
1247
1248 /*
1249 * for every group this guy is a member of
1250 * Extract this member from group
1251 * if the group is unchanged (in GOP all groups are unchanged) then:
1252 * Increment Grp_id
1253 * Notify all local members of a regular membership caused by disconnet
1254 */
1255 G_private_to_names( private_group_name, private_name, proc_name );
1256 p_ind = Conf_proc_by_name( proc_name, &p );
1257 if( p_ind < 0 )
1258 {
1259 Alarm( PRINT, "G_handle_kill: illegal proc_name %s in private_group %s \n",
1260 proc_name, private_group_name );
1261 return;
1262 }
1263
1264 if( p.id == My.id ) ses = Sess_get_session( private_name );
1265
1266 giter = sl_getlist( &GroupsList );
1267 grp = (giter)?(group *)giter->data:NULL;
1268 for( ; grp != NULL ; grp = nextgroup)
1269 {
1270 /* This is confusing... get the nextgroup so that if we
1271 choose to remove it it doesn't screw up the iterator.
1272 Then next time through use this "next" value */
1273 nextgroup = sl_next( &GroupsList, &giter );
1274 mbr = G_get_member( grp, private_group_name );
1275 if( mbr == NULL ) continue; /* no such member in that group */
1276
1277 /* Extract this member from group */
1278 if( p.id == My.id )
1279 {
1280 /* extract the mbox if local member */
1281 for( i=0, j=0; i < grp->num_local; i++, j++ )
1282 {
1283 if( grp->mbox[i] == Sessions[ses].mbox ) j--;
1284 else grp->mbox[j] = grp->mbox[i];
1285 }
1286 grp->num_local--;
1287 }
1288 memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME );
1289 sl_remove( &grp->MembersList, mbr->private_name, dispose );
1290 grp->num_members--;
1291 if( grp->num_members == 0 )
1292 {
1293 sl_destruct ( &grp->MembersList, dispose);
1294 sl_remove( &GroupsList, grp->name, dispose );
1295 Num_groups--;
1296 GlobalStatus.num_groups = Num_groups;
1297 continue;
1298 }
1299
1300 if( grp->changed )
1301 {
1302 if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_kill: changed group in GOP\n");
1303 /*
1304 * If the group is changed (in GTRANS) then there is no need
1305 * to increment group id or to notify the local members.
1306 * They will get a group membership after the state transfer
1307 * terminates.
1308 */
1309 continue;
1310 }
1311
1312 /* Increment group id */
1313 grp->grp_id.index++;
1314
1315 /* Compute the mask */
1316 for(i=0; i<4; i++)
1317 {
1318 grp->grp_mask[i] = 0;
1319 }
1320 {
1321 struct skiplistnode *iter;
1322 member *memp;
1323 for( iter = sl_getlist( &grp->MembersList ),
1324 memp=(member *)iter->data;
1325 iter != NULL;
1326 memp = (member *)sl_next( &grp->MembersList, &iter )) {
1327 p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
1328 temp = 1;
1329 for(i=0; i<p1.seg_index%32; i++)
1330 {
1331 temp *= 2;
1332 }
1333 grp->grp_mask[p1.seg_index/32] |= temp;
1334 }
1335 }
1336
1337 Alarm(GROUPS, "G_handle_kill: Mask for group %s set to %x %x %x %x\n",
1338 grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
1339
1340 if( grp->num_local > 0 )
1341 {
1342 /* notify all local members */
1343 msg = Message_new_message();
1344 num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
1345 head_ptr = Message_get_message_header(msg);
1346
1347 head_ptr->type |= CAUSED_BY_DISCONNECT ;
1348
1349 num_vs_ptr = &Mess_buf[ num_bytes ];
1350 num_bytes += sizeof( int32 );
1351 temp = 1;
1352 memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
1353
1354 vs_ptr = (char *)&Mess_buf[ num_bytes ];
1355 memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME );
1356 num_bytes += MAX_GROUP_NAME;
1357
1358 head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
1359
1360 mess_link = new( MESSAGE_LINK );
1361 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
1362 mess_link->mess = msg;
1363 Obj_Inc_Refcount(mess_link->mess);
1364 needed = 0;
1365 for( i=0; i < grp->num_local; i++ )
1366 {
1367 int temp_ses;
1368
1369 temp_ses = Sess_get_session_index ( grp->mbox[i] );
1370 if( Is_memb_session( Sessions[ temp_ses ].status ) )
1371 Sess_write( temp_ses, mess_link, &needed );
1372 }
1373 if ( !needed ) Sess_dispose_message( mess_link );
1374 Message_Dec_Refcount(msg);
1375 }
1376 }
1377 break;
1378
1379 case GGATHER:
1380 Alarm( EXIT, "G_handle_kill in GGATHER\n");
1381
1382 break;
1383
1384 case GGT:
1385 Alarm( EXIT, "G_handle_kill in GGT\n");
1386
1387 break;
1388 }
1389 }
1390
G_handle_groups(message_link * mess_link)1391 void G_handle_groups( message_link *mess_link )
1392 {
1393 char *memb_id_ptr;
1394 membership_id temp_memb_id;
1395 message_obj *msg;
1396 message_header *head_ptr;
1397
1398 Alarm( GROUPS, "G_handle_groups: \n" );
1399
1400 switch( Gstate )
1401 {
1402 case GOP:
1403
1404 Alarm( EXIT, "G_handle_groups in GOP\n");
1405
1406 break;
1407
1408 case GTRANS:
1409
1410 Alarm( EXIT, "G_handle_groups in GTRANS\n");
1411
1412 break;
1413
1414 case GGATHER:
1415 case GGT:
1416
1417 if (Gstate == GGATHER) Alarm( GROUPS, "G_handle_groups in GGATHER\n");
1418 if (Gstate == GGT) Alarm( GROUPS, "G_handle_groups in GGT\n");
1419
1420 msg = mess_link->mess;
1421 Obj_Inc_Refcount(msg);
1422 head_ptr = Message_get_message_header(msg);
1423 memb_id_ptr = Message_get_first_data_ptr(msg);
1424 memcpy( &temp_memb_id, memb_id_ptr, sizeof( membership_id ) );
1425 if( !Same_endian( head_ptr->type ) )
1426 {
1427 /* Flip membership id */
1428 temp_memb_id.proc_id = Flip_int32( temp_memb_id.proc_id );
1429 temp_memb_id.time = Flip_int32( temp_memb_id.time );
1430 }
1431 if( ! Memb_is_equal( temp_memb_id, Reg_memb_id ) )
1432 {
1433 Alarm( GROUPS,
1434 "G_handle_groups: GROUPS message received from bad memb id proc %d, time %d, daemon %s.\n",
1435 temp_memb_id.proc_id, temp_memb_id.time, head_ptr->private_group_name );
1436 Sess_dispose_message( mess_link );
1437 Message_Dec_Refcount(msg);
1438 return;
1439 }
1440
1441 mess_link->next = Gathered.next;
1442 Gathered.next = mess_link;
1443 Num_mess_gathered++;
1444 /* The last Groups message a daemon sends is AGREED. */
1445 if( Is_agreed_mess( head_ptr->type ) ) Num_daemons_gathered++;
1446 Alarm( GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n",
1447 head_ptr->private_group_name, Num_mess_gathered, Num_daemons_gathered );
1448 if( Num_daemons_gathered != Conf_num_procs( &Reg_memb ) )
1449 {
1450 Message_Dec_Refcount(msg);
1451 return;
1452 }
1453 Alarm( GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
1454 Num_mess_gathered, Num_daemons_gathered );
1455 /* Replace protocol queue */
1456 Prot_set_down_queue( NORMAL_DOWNQUEUE );
1457
1458 /* lower events threshold */
1459 Session_threshold = LOW_PRIORITY;
1460 Sess_set_active_threshold();
1461
1462 /*
1463 * Compute new groups membership and notify members of
1464 * groups that have changed
1465 */
1466 G_compute_and_notify();
1467
1468 if( Gstate == GGATHER )
1469 {
1470 Gstate = GOP;
1471 GlobalStatus.gstate = Gstate;
1472 }else{
1473 Gstate = GOP;
1474 /* We do want to deliver a transitional signal to any
1475 * groups that are going to get a CAUSED_BY_NETWORK
1476 * after our Reg_memb is delivered. */
1477 G_handle_trans_memb( Trans_memb, Trans_memb_id );
1478 }
1479
1480 Message_Dec_Refcount(msg);
1481 break;
1482 }
1483 }
1484
G_compute_and_notify()1485 static void G_compute_and_notify()
1486 {
1487 group *grp, *new_grp, *orig_grp;
1488 member *mbr;
1489 int changed;
1490 int ret;
1491 int vs_bytes;
1492 char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
1493 int32 num_vs;
1494 int num_exist;
1495 struct worklist *indices[MAX_PROCS_RING];
1496 int num_bytes;
1497 message_link *mess_link;
1498 message_header *head_ptr;
1499 message_obj *msg;
1500 int needed;
1501 char proc_name[MAX_PROC_NAME];
1502 char private_name[MAX_PRIVATE_NAME+1];
1503 int ses;
1504 int i;
1505 Skiplist work;
1506
1507 Alarm( GROUPS, "G_compute_and_notify:\n");
1508 /* Compute groups structure in Work from gathered messages and clear messages */
1509
1510 sl_init(&work);
1511 sl_set_compare(&work, G_work_groups_comp, G_work_groups_keycomp);
1512
1513 for( i=0; i < Num_mess_gathered; i++ )
1514 {
1515 struct worklist *tp;
1516 tp = (struct worklist *)Mem_alloc(sizeof(struct worklist));
1517 tp->groups=NULL;
1518 mess_link = Gathered.next;
1519 Gathered.next = mess_link->next;
1520 ret = G_mess_to_groups( mess_link, tp->name, tp );
1521 if( ret < 0 )
1522 Alarm( EXIT, "G_compute_and_notify: G_mess_to_groups errored %d\n",
1523 ret );
1524 Sess_dispose_message( mess_link );
1525 if ( !sl_insert(&work, tp) )
1526 {
1527 Alarm(EXIT, "G_compute_and_notify: Failed to insert worklist (%s) into work\n", tp->name);
1528 }
1529 }
1530 /*
1531 * for every sorted group name:
1532 * Join the member lists to one list in Groups with a vs set.
1533 * If the group has changed (*)
1534 * Set new gid
1535 * notify all local members: non-new get vs set, new get self.
1536 * cancel new mark.
1537 * dispose of this group is all of Work.
1538 *
1539 * Note: group has changed unless all of this hold:
1540 * - everyone has the same gid
1541 * - gid is not changed (-1)
1542 */
1543
1544 for( num_exist = G_smallest_group_indices( &work, indices ) ;
1545 num_exist > 0 ;
1546 num_exist = G_smallest_group_indices( &work, indices ) )
1547 {
1548 struct skiplistnode *top_iter;
1549 group *this_group;
1550 /* prepare vs set */
1551 vs_bytes = 0;
1552 num_vs_ptr = &Temp_buf[0];
1553 vs_bytes+= sizeof( int32 );
1554 num_vs = 0;
1555
1556 changed = 0;
1557 orig_grp = NULL;
1558 assert( NULL != (this_group = (group *)(sl_getlist(indices[0]->groups)->data)) );
1559 orig_grp = sl_find( &GroupsList, this_group->name, &top_iter);
1560
1561 if( orig_grp == NULL )
1562 {
1563 new_grp = new( GROUP );
1564 memset( new_grp->name, 0, MAX_GROUP_NAME );
1565 strcpy( new_grp->name, this_group->name );
1566
1567 new_grp->grp_id = this_group->grp_id;
1568
1569 new_grp->num_members = 0;
1570 sl_init( &new_grp->MembersList );
1571 sl_set_compare( &new_grp->MembersList,
1572 G_member_recordcompare,
1573 G_member_keycompare);
1574 new_grp->num_local = 0;
1575
1576 sl_insert( &GroupsList, new_grp );
1577 Num_groups++;
1578 GlobalStatus.num_groups = Num_groups;
1579 orig_grp = new_grp;
1580 }else{
1581 /* free members but keep local mbox */
1582 sl_remove_all( &orig_grp->MembersList, dispose );
1583 orig_grp->num_members = 0;
1584 }
1585
1586 for( i=0 ; i < num_exist; i++ )
1587 {
1588 group *currentgroup;
1589 currentgroup =
1590 (group *)sl_getlist(indices[i]->groups)->data;
1591 if( G_id_is_equal( orig_grp->grp_id, currentgroup->grp_id ) )
1592 {
1593 struct skiplistnode *iter;
1594 Skiplist *currentmembers;
1595 currentmembers = ¤tgroup->MembersList;
1596 iter = sl_getlist(currentmembers);
1597 assert(iter != NULL); /* memberlist in Groups message should never be empty */
1598 for( mbr = iter->data;
1599 mbr != NULL;
1600 mbr = sl_next(currentmembers, &iter))
1601 {
1602 /* add this non-new member to vs */
1603 memcpy( &Temp_buf[vs_bytes], mbr->private_name, MAX_GROUP_NAME );
1604 vs_bytes += MAX_GROUP_NAME;
1605 num_vs++;
1606 }
1607 }else{
1608 /* not the same grp_id */
1609 changed = 1;
1610 }
1611 /* in any way, mbr points here to the last member */
1612 /* chain these members */
1613
1614 sl_concat(&orig_grp->MembersList,
1615 ¤tgroup->MembersList);
1616 orig_grp->num_members = orig_grp->MembersList.size;
1617
1618 /* free this Work group */
1619 sl_destruct(¤tgroup->MembersList, dispose);
1620 sl_remove(indices[i]->groups, currentgroup, dispose);
1621 }
1622
1623 memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = current count; */
1624
1625 /* now our orig_grp is almost updated */
1626 grp = orig_grp;
1627
1628 if( grp->changed ) changed = 1;
1629
1630 if( !changed ) continue;
1631
1632 /* the group has changed */
1633 Alarm( GROUPS, "G_compute_and_notify: completed group %s.\n", grp->name );
1634 Alarm( DEBUG, "G_compute_and_notify: changing group id from: " );
1635 G_print_group_id( grp->grp_id );
1636 grp->grp_id.memb_id = Reg_memb_id;
1637 grp->grp_id.index = 1;
1638 grp->changed = 0;
1639 Alarm( DEBUG, " to: " );
1640 G_print_group_id( grp->grp_id );
1641 Alarm( DEBUG, "\n" );
1642
1643 if( grp->num_local > 0 )
1644 {
1645 struct skiplistnode *iter;
1646 msg = Message_new_message();
1647 num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
1648 head_ptr = Message_get_message_header(msg);
1649
1650 head_ptr->type |= CAUSED_BY_NETWORK ;
1651
1652 /* notify non-new local members */
1653 memcpy( &Mess_buf[num_bytes], Temp_buf, vs_bytes );
1654 head_ptr->data_len += vs_bytes;
1655
1656 mess_link = new( MESSAGE_LINK );
1657 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes + vs_bytes);
1658 mess_link->mess = msg;
1659 Obj_Inc_Refcount(mess_link->mess);
1660 needed = 0;
1661 iter = sl_getlist(&grp->MembersList);
1662 for( mbr = iter->data;
1663 mbr != NULL;
1664 mbr = sl_next(&grp->MembersList, &iter))
1665 {
1666 if( Is_new_member( mbr->status ) ) continue;
1667 if( mbr->proc_id != My.id ) continue;
1668
1669 G_private_to_names( mbr->private_name, private_name, proc_name );
1670 ses = Sess_get_session( private_name );
1671 if( ses < 0 ) Alarm( EXIT, "G_compute_and_notify: no session for %s\n", private_name);
1672
1673 if( Is_memb_session( Sessions[ ses ].status ) )
1674 Sess_write( ses, mess_link, &needed );
1675 }
1676 if( !needed ) Sess_dispose_message( mess_link );
1677 Message_Dec_Refcount(msg);
1678 }
1679 }
1680 Num_mess_gathered = 0;
1681 Num_daemons_gathered = 0;
1682
1683 /* We're going back to GOP... destroy our groups messages. */
1684 G_empty_groups_bufs();
1685
1686 /* Finish freeing the memory in our worklists */
1687 {
1688 struct worklist *worklist;
1689 struct skiplistnode *iter;
1690
1691 iter = sl_getlist(&work);
1692 worklist = (iter)?iter->data:NULL;
1693
1694 while( worklist != NULL ) {
1695 assert( worklist->groups->size == 0 );
1696 dispose( worklist->groups );
1697 worklist = sl_next(&work, &iter);
1698 }
1699 }
1700
1701 sl_destruct( &work, dispose );
1702
1703 G_print();
1704 }
1705
G_smallest_group_indices(Skiplist * work,struct worklist * indices[])1706 static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] )
1707 {
1708 /*
1709 * this function searches the Work structure for the smallest
1710 * alphabetically ordered group name. It stores
1711 * all of the occurences of that group in the indices array,
1712 * and returns the number of occurences.
1713 */
1714 int num_exist;
1715 int cmp;
1716 struct worklist *worklist;
1717 Skiplist *groups;
1718 struct skiplistnode *iter;
1719
1720 iter = sl_getlist(work);
1721 worklist = (iter)?iter->data:NULL;
1722 num_exist = 0;
1723 if(!worklist) {
1724 return 0;
1725 }
1726 /* set indices[0] to first worklist with any groups */
1727 do {
1728 if ( worklist->groups->size == 0 )
1729 {
1730 worklist = sl_next(work, &iter);
1731 } else {
1732 indices[0] = worklist;
1733 num_exist = 1;
1734 break;
1735 }
1736 } while ( worklist != NULL );
1737
1738 if(!worklist) {
1739 /* All worklist groups are empty (no daemons have any alive groups) */
1740 return 0;
1741 }
1742
1743 worklist = sl_next( work, &iter );
1744 /* Check rest of worklists for any with earlier groups or the same first group as indices[0] */
1745 while ( worklist != NULL )
1746 {
1747 group *first, *current;
1748
1749 groups = worklist->groups;
1750 if( groups->size == 0 )
1751 {
1752 worklist = sl_next(work, &iter);
1753 continue;
1754 }
1755 first = (group *)(sl_getlist(indices[0]->groups)->data);
1756 current = (group *)(sl_getlist(groups)->data);
1757 cmp = strcmp( first->name, current->name );
1758 if( cmp == 0 )
1759 {
1760 indices[num_exist] = worklist;
1761 num_exist++;
1762 }else if( cmp > 0 ){
1763 indices[0] = worklist;
1764 num_exist = 1;
1765 }
1766 worklist = sl_next(work, &iter);
1767 }
1768 return( num_exist );
1769 }
1770
G_id_is_equal(group_id g1,group_id g2)1771 static int G_id_is_equal( group_id g1, group_id g2 )
1772 {
1773 if( g1.index == g2.index && Memb_is_equal( g1.memb_id, g2.memb_id ) )
1774 return( 1 );
1775 else return( 0 );
1776 }
1777
G_get_group(char * group_name)1778 static group *G_get_group( char *group_name )
1779 {
1780 struct skiplistnode *iter;
1781
1782 return sl_find( &GroupsList, group_name, &iter );
1783 }
1784
G_get_member(group * grp,char * private_group_name)1785 static member *G_get_member( group *grp, char *private_group_name )
1786 {
1787 struct skiplistnode *iter;
1788
1789 return sl_find( &grp->MembersList, private_group_name, &iter );
1790 }
1791
G_build_trans_mess(group * grp)1792 static message_link *G_build_trans_mess( group *grp )
1793 {
1794 /*
1795 * This routine builds a ready-to-be-sent transitional message signal
1796 * to the members of the process group grp
1797 */
1798
1799 message_link *mess_link;
1800 scatter *scat;
1801 message_header *head_ptr;
1802 char *gid_ptr;
1803
1804 mess_link = new( MESSAGE_LINK );
1805 mess_link->mess = Message_create_message(TRANSITION_MESS, grp->name);
1806
1807 scat = Message_get_data_scatter(mess_link->mess);
1808 scat->elements[0].len = Message_get_data_header_size() +
1809 sizeof( group_id );
1810 head_ptr = Message_get_message_header(mess_link->mess);
1811 gid_ptr = Message_get_first_data_ptr(mess_link->mess );
1812
1813 head_ptr->data_len = sizeof( group_id );
1814 memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
1815
1816 return( mess_link );
1817 }
1818
G_build_memb_buf(group * grp,message_obj * msg,char buf[])1819 static int G_build_memb_buf( group *grp, message_obj *msg, char buf[])
1820 {
1821 int num_bytes;
1822 message_header *head_ptr;
1823 char *gid_ptr;
1824 member *mbr;
1825 struct skiplistnode *iter;
1826 char *memb_ptr;
1827
1828
1829 head_ptr = Message_get_message_header(msg);
1830 head_ptr->type = REG_MEMB_MESS;
1831 head_ptr->type = Set_endian( head_ptr->type );
1832 head_ptr->hint = Set_endian( 0 );
1833 memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
1834 head_ptr->num_groups = grp->num_members;
1835 head_ptr->data_len = sizeof( group_id );
1836
1837 num_bytes = 0;
1838 iter = sl_getlist( &grp->MembersList );
1839 mbr = (iter)?(member *)iter->data:NULL;
1840 for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
1841 {
1842 memb_ptr = (char *)&buf[num_bytes];
1843 num_bytes += MAX_GROUP_NAME;
1844 memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME );
1845 }
1846
1847 gid_ptr = &buf[num_bytes];
1848 num_bytes += sizeof( group_id );
1849 memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
1850
1851 return( num_bytes );
1852 }
1853
1854
G_build_memb_vs_buf(group * grp,message_obj * msg,char buf[],int32 caused)1855 static int G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused )
1856 {
1857 /*
1858 * This routine builds the memb buffer message, including a virtual synchrony
1859 * (failure atomicity) part with a set which contains only the established members
1860 * in the group membership.
1861 *
1862 * Note that in leave and disconnect we provide the member that left or
1863 * got disconnected in the vs_set. Therefore, caused will always be CAUSED_BY_NETWORK.
1864 */
1865
1866 int num_bytes;
1867 message_header *head_ptr;
1868 char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
1869 struct skiplistnode *iter;
1870 member *mbr;
1871 char *membs_ptr;
1872 int32 num_vs;
1873
1874 num_bytes = G_build_memb_buf( grp, msg, buf);
1875 head_ptr = Message_get_message_header(msg);
1876
1877 head_ptr->type = head_ptr->type | caused;
1878
1879 num_vs_ptr = &buf[num_bytes];
1880 num_bytes += sizeof( int32 );
1881 head_ptr->data_len += sizeof( int32 );
1882 num_vs = 0;
1883
1884 iter = sl_getlist( &grp->MembersList );
1885 mbr = (iter)?(member *)iter->data:NULL;
1886 for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
1887 {
1888 if( Is_established_member( mbr->status ) )
1889 {
1890 membs_ptr = (char *)&buf[num_bytes];
1891 memcpy( membs_ptr, mbr->private_name, MAX_GROUP_NAME );
1892 num_vs++ ;
1893 num_bytes += MAX_GROUP_NAME;
1894 head_ptr->data_len += MAX_GROUP_NAME;
1895 }
1896 }
1897 memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = total count; */
1898
1899 return( num_bytes );
1900 }
1901
G_stamp_groups_buf(char buf[])1902 static void G_stamp_groups_buf( char buf[] )
1903 {
1904 char *memb_id_ptr;
1905 memb_id_ptr = buf;
1906 memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
1907 }
1908
1909 /* This function used to be called G_refresh_groups_msg. */
G_build_groups_msg_hdr(message_obj * msg,int groups_bytes)1910 static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes )
1911 {
1912 message_header *head_ptr;
1913
1914 head_ptr = Message_get_message_header(msg);
1915 head_ptr->type = GROUPS_MESS;
1916 head_ptr->type = Set_endian( head_ptr->type );
1917 head_ptr->hint = Set_endian( 0 );
1918 memset(head_ptr->private_group_name, 0, MAX_GROUP_NAME);
1919 strcpy( head_ptr->private_group_name, My.name );
1920 head_ptr->num_groups = 0;
1921 head_ptr->data_len = groups_bytes;
1922 }
1923
1924 /* This function guarantees that each group's data appears in only one buffer in
1925 * a sequence, and that the sorted order is preserved from the GroupsList. */
G_build_groups_buf(char buf[],struct skiplistnode ** iter_ptr)1926 static int G_build_groups_buf(char buf[], struct skiplistnode **iter_ptr)
1927 {
1928 int num_bytes;
1929 char *memb_id_ptr;
1930 group *grp;
1931 char *gid_ptr;
1932 member *mbr;
1933 struct skiplistnode *giter, *iter;
1934 char *num_memb_ptr;
1935 int16 num_memb;
1936 char *memb_ptr;
1937 int size_for_this_group;
1938
1939 num_bytes = 0;
1940
1941 memb_id_ptr = &buf[num_bytes];
1942 num_bytes += sizeof( membership_id );
1943 memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
1944
1945 giter = (*iter_ptr) ? (*iter_ptr) : (sl_getlist( &GroupsList ));
1946
1947 grp = (giter)?(group *)giter->data:NULL;
1948 for( ; grp != NULL ; grp = sl_next( &GroupsList, &giter ) )
1949 {
1950 if( grp->num_local == 0 ) continue;
1951
1952 size_for_this_group = MAX_GROUP_NAME + sizeof(group_id) + sizeof(int16) +
1953 (grp->num_local * MAX_GROUP_NAME);
1954 /* This requires that the number of local group members be limited. */
1955 if( size_for_this_group > GROUPS_BUF_SIZE - num_bytes ) break;
1956
1957 memcpy( &buf[num_bytes], grp->name, MAX_GROUP_NAME );
1958 num_bytes += MAX_GROUP_NAME;
1959
1960 gid_ptr = &buf[num_bytes];
1961 num_bytes += sizeof( group_id );
1962 memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
1963
1964 num_memb_ptr = &buf[num_bytes];
1965 num_bytes += sizeof( int16 );
1966 num_memb = 0;
1967
1968 iter = sl_getlist( &grp->MembersList );
1969 mbr = (iter)?(member *)iter->data:NULL;
1970 for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
1971 {
1972 /* collect local members */
1973 if( mbr->proc_id != My.id ) continue;
1974 memb_ptr = (char *)&buf[num_bytes];
1975 num_bytes += MAX_GROUP_NAME;
1976 memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME );
1977 num_memb++;
1978 }
1979 memcpy(num_memb_ptr, &num_memb, sizeof( int16 ) );
1980 if( num_memb != grp->num_local )
1981 Alarm( EXIT, "G_build_groups_buf: group %s has %d %d members\n",
1982 grp->name, num_memb, grp->num_local );
1983
1984 }
1985 *iter_ptr = giter;
1986 return( num_bytes );
1987 }
1988
G_mess_to_groups(message_link * mess_link,char * name,struct worklist * work)1989 static int G_mess_to_groups( message_link *mess_link, char *name, struct worklist *work )
1990 {
1991 /* the function returns 0 for success or -1 if an error occured */
1992
1993 message_obj *msg;
1994 scatter *scat;
1995 message_header *head_ptr;
1996 proc p;
1997 int num_bytes, total_bytes;
1998 group *grp;
1999 char *gid_ptr;
2000 member *mbr;
2001 char *num_memb_ptr;
2002 int16 num_memb;
2003 int i;
2004
2005 total_bytes = 0;
2006 msg = mess_link->mess;
2007 scat = Message_get_data_scatter(msg);
2008 for( i=0; i < scat->num_elements ; i++ )
2009 {
2010 memcpy( &Temp_buf[total_bytes], scat->elements[i].buf, scat->elements[i].len );
2011 total_bytes += scat->elements[i].len;
2012 }
2013
2014 num_bytes = 0;
2015 head_ptr = Message_get_message_header(msg);
2016 num_bytes += Message_get_data_header_size();
2017 if (0 > Conf_proc_by_name( head_ptr->private_group_name , &p ) )
2018 {
2019 Alarm( PRINT, "G_mess_to_groups: Groups message from someone (%s) not in conf\n", head_ptr->private_group_name);
2020 return( -1 );
2021 }
2022 work->groups = (Skiplist *)Mem_alloc(sizeof(Skiplist));
2023 sl_init(work->groups);
2024 sl_set_compare(work->groups,
2025 G_compare,
2026 G_compare);
2027
2028 memcpy( name, head_ptr->private_group_name, MAX_GROUP_NAME );
2029
2030 num_bytes += sizeof( membership_id );
2031
2032 for( ; num_bytes < total_bytes; )
2033 {
2034 /* creating a new group */
2035 grp = new( GROUP );
2036
2037 memcpy( grp->name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
2038 num_bytes += MAX_GROUP_NAME;
2039
2040 sl_append( work->groups, grp );
2041 sl_init( &grp->MembersList );
2042 sl_set_compare( &grp->MembersList,
2043 G_member_recordcompare,
2044 G_member_keycompare);
2045
2046 gid_ptr = &Temp_buf[num_bytes];
2047 num_bytes += sizeof( group_id );
2048 memcpy( &grp->grp_id, gid_ptr, sizeof(group_id) );
2049
2050 num_memb_ptr = &Temp_buf[num_bytes];
2051 num_bytes += sizeof( int16 );
2052 memcpy( &num_memb, num_memb_ptr, sizeof( int16 ) );
2053
2054 if( !Same_endian( head_ptr->type ) )
2055 {
2056 /* Flip group id */
2057 grp->grp_id.memb_id.proc_id = Flip_int32( grp->grp_id.memb_id.proc_id );
2058 grp->grp_id.memb_id.time = Flip_int32( grp->grp_id.memb_id.time );
2059 grp->grp_id.index = Flip_int32( grp->grp_id.index );
2060
2061 /* flip other parts of the message */
2062 num_memb = Flip_int16( num_memb );
2063 }
2064 /* creating members */
2065 for( i=0; i < num_memb; i++ )
2066 {
2067 mbr = new( MEMBER );
2068
2069 mbr->proc_id = p.id;
2070 mbr->status = ESTABLISHED_MEMBER;
2071 memcpy( mbr->private_name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
2072 num_bytes += MAX_GROUP_NAME;
2073
2074 sl_append( &grp->MembersList, mbr );
2075 }
2076
2077 grp->num_members = num_memb;
2078 memcpy( num_memb_ptr, &num_memb, sizeof( int16 ) );
2079 }
2080 return( 0 );
2081 }
2082
G_analize_groups(int num_groups,char target_groups[][MAX_GROUP_NAME],int target_sessions[])2083 int G_analize_groups( int num_groups, char target_groups[][MAX_GROUP_NAME], int target_sessions[] )
2084 {
2085 static mailbox mbox[MAX_SESSIONS];
2086 static mailbox current[MAX_SESSIONS];
2087 static mailbox *current_ptr;
2088 int num_mbox;
2089 int num_mbox_pre;
2090 int num_current;
2091 group *grp;
2092 member *mbr;
2093 struct skiplistnode *iter;
2094 char proc_name[MAX_PROC_NAME];
2095 char private_name[MAX_PRIVATE_NAME+1];
2096 int found;
2097 int ses;
2098 int ret;
2099 int i, j, k;
2100
2101 /* get the mbox */
2102 num_mbox = 0;
2103 for( i=0; i < num_groups; i++ )
2104 {
2105 if( target_groups[i][0] == '#' )
2106 {
2107 /* private group */
2108 ret = G_private_to_names( target_groups[i], private_name, proc_name );
2109
2110 /* Illegal group */
2111 if( ret < 0 ) continue;
2112
2113 /* this private group is not local */
2114 if( strcmp( My.name, proc_name ) != 0 ) continue;
2115
2116 ses = Sess_get_session( private_name );
2117
2118 /* we have no such session */
2119 if( ses < 0 ) continue;
2120
2121 current[0] = Sessions[ ses ].mbox;
2122 current_ptr = current;
2123 num_current = 1;
2124 }else{
2125 /* regular group */
2126 grp = G_get_group( target_groups[i] );
2127 if( grp == NULL ) continue;
2128 if( Gstate == GOP )
2129 {
2130 current_ptr = grp->mbox;
2131 num_current = grp->num_local;
2132 }else if( Gstate == GTRANS ){
2133 current_ptr = current;
2134 num_current = 0;
2135 iter = sl_getlist( &grp->MembersList );
2136 mbr = (iter)?(member *)iter->data:NULL;
2137 for( ; mbr != NULL ;
2138 mbr = sl_next( &grp->MembersList, &iter ) )
2139 {
2140 if( mbr->proc_id == My.id && !Is_new_member( mbr->status ) )
2141 {
2142 G_private_to_names( mbr->private_name, private_name, proc_name );
2143 ses = Sess_get_session( private_name );
2144 if( ses < 0 ) Alarm( EXIT,
2145 "G_analize_groups: ses is %d private_name is %s\n",
2146 ses, private_name );
2147 current[ num_current ] = Sessions[ ses ].mbox;
2148 num_current++;
2149 }
2150 }
2151 }else {
2152 num_current = 0; /* fool compiler warnings */
2153 Alarm( EXIT, "G_analize_groups: Gstate is %d\n", Gstate );
2154 }
2155 }
2156 num_mbox_pre = num_mbox;
2157 for( j=0; j < num_current; j++ )
2158 {
2159 found = 0;
2160 for( k=0; k < num_mbox_pre; k++ )
2161 {
2162 if( mbox[k] == current_ptr[j] )
2163 {
2164 found = 1;
2165 break;
2166 }
2167 }
2168 if( !found )
2169 {
2170 mbox[num_mbox] = current_ptr[j];
2171 num_mbox++;
2172 }
2173 }
2174 }
2175
2176 /* convert mbox to sessions */
2177 for( i=0; i < num_mbox; i++ ) target_sessions[i] = Sess_get_session_index ( mbox[ i ] );
2178 return( num_mbox );
2179 }
2180
2181
G_set_mask(int num_groups,char target_groups[][MAX_GROUP_NAME],int32u * grp_mask)2182 void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask )
2183 {
2184 group *grp;
2185 char proc_name[MAX_PROC_NAME];
2186 char private_name[MAX_PRIVATE_NAME+1];
2187 int ret;
2188 int i, j;
2189 proc p;
2190 int32u temp;
2191
2192
2193 for(i=0; i<4; i++)
2194 {
2195 grp_mask[i] = 0;
2196 }
2197
2198 for( i=0; i < num_groups; i++ )
2199 {
2200 if( target_groups[i][0] == '#' )
2201 {
2202 /* private group */
2203 ret = G_private_to_names( target_groups[i], private_name, proc_name );
2204
2205 /* Illegal group */
2206 if( ret < 0 ) continue;
2207
2208 Conf_proc_by_name( proc_name, &p );
2209 temp = 1;
2210 for(j=0; j<p.seg_index%32; j++)
2211 {
2212 temp *= 2;
2213 }
2214 grp_mask[p.seg_index/32] |= temp;
2215
2216 }else{
2217 /* regular group */
2218 grp = G_get_group( target_groups[i] );
2219 if( grp == NULL )
2220 {
2221 p = Conf_my();
2222 temp = 1;
2223 for(j=0; j<p.seg_index%32; j++)
2224 {
2225 temp *= 2;
2226 }
2227 grp_mask[p.seg_index/32] |= temp;
2228 }
2229 else if(( Gstate == GOP )||(Gstate == GTRANS))
2230 {
2231 for(j=0; j<4; j++)
2232 {
2233 grp_mask[j] |= grp->grp_mask[j];
2234 }
2235 p = Conf_my();
2236 temp = 1;
2237 for(j=0; j<p.seg_index%32; j++)
2238 {
2239 temp *= 2;
2240 }
2241 grp_mask[p.seg_index/32] |= temp;
2242
2243 }else Alarm( EXIT, "G_set_mask: Gstate is %d\n", Gstate );
2244 }
2245 }
2246 }
2247
2248
2249
G_private_to_names(char * private_group_name,char * private_name,char * proc_name)2250 int G_private_to_names( char *private_group_name, char *private_name, char *proc_name )
2251 {
2252 char name[MAX_GROUP_NAME];
2253 char *pn, *prvn;
2254 unsigned int priv_name_len, proc_name_len;
2255 int i,legal_private_name;
2256
2257 memcpy(name, private_group_name, MAX_GROUP_NAME );
2258 proc_name_len = 0; /* gcc not smart enough to detect that proc_name_len is always initialized when used */
2259
2260 pn = strchr(&name[1], '#');
2261 if (pn != NULL)
2262 {
2263 pn[0] = '\0';
2264 proc_name_len = strlen( &(pn[1]));
2265 }
2266 priv_name_len = strlen( &(name[1]));
2267 if ( (pn == NULL) || (name[0] != '#' ) ||
2268 ( priv_name_len > MAX_PRIVATE_NAME) ||
2269 ( priv_name_len < 1 ) ||
2270 ( proc_name_len >= MAX_PROC_NAME ) ||
2271 ( proc_name_len < 1 ) )
2272 {
2273 Alarm( GROUPS, "G_private_to_names: Illegal private_group_name (priv, proc)\n");
2274 return( ILLEGAL_GROUP );
2275 }
2276 /* start strings at actual beginning */
2277 pn++;
2278 pn[proc_name_len] = '\0';
2279 prvn = &name[1];
2280 legal_private_name = 1;
2281 for( i=0; i < priv_name_len; i++ )
2282 if( prvn[i] <= '#' ||
2283 prvn[i] > '~' )
2284 {
2285 legal_private_name = 0;
2286 prvn[i] = '.';
2287 }
2288 for( i=0; i < proc_name_len; i++ )
2289 if( pn[i] <= '#' ||
2290 pn[i] > '~' )
2291 {
2292 legal_private_name = 0;
2293 pn[i] = '.';
2294 }
2295 if( !legal_private_name )
2296 {
2297 Alarm( GROUPS, "G_private_to_names: Illegal private_group_name characters (%s) (%s)\n", prvn, pn );
2298 return( ILLEGAL_GROUP );
2299 }
2300 /* copy name components including null termination */
2301 memcpy( private_name, prvn, priv_name_len + 1 );
2302 memcpy( proc_name, pn, proc_name_len + 1 );
2303 return( 1 );
2304 }
2305
G_print()2306 static void G_print()
2307 {
2308 group *grp;
2309 member *mbr;
2310 struct skiplistnode *giter, *iter;
2311 int i, j;
2312
2313 printf("++++++++++++++++++++++\n");
2314 printf("Num of groups: %d\n", Num_groups );
2315 giter = sl_getlist( &GroupsList );
2316 grp = (giter)?(group *)giter->data:NULL;
2317 for( i=0; grp != NULL ; i++, grp = sl_next( &GroupsList, &giter ) )
2318 {
2319 printf("[%d] group %s with %d members:\n", i+1, grp->name, grp->num_members );
2320 iter = sl_getlist( &grp->MembersList );
2321 mbr = (iter)?(member *)iter->data:NULL;
2322 for( j=0; mbr != NULL ;
2323 j++, mbr = sl_next( &grp->MembersList, &iter ) )
2324 {
2325 printf("\t[%d] %s\n", j+1, mbr->private_name );
2326 }
2327 printf("----------------------\n");
2328 }
2329 }
2330
G_empty_groups_bufs()2331 static void G_empty_groups_bufs()
2332 {
2333 groups_buf_link *next;
2334
2335 for( ; Groups_bufs; Groups_bufs = next )
2336 {
2337 next = Groups_bufs->next;
2338 dispose( Groups_bufs );
2339 }
2340 return;
2341 }
2342
G_get_num_local(char * group_name)2343 int G_get_num_local( char *group_name )
2344 {
2345 group *grp = G_get_group( group_name );
2346 if( grp == NULL ) return 0;
2347 return grp->num_local;
2348 }
2349
G_print_group_id(group_id g)2350 static void G_print_group_id( group_id g )
2351 {
2352 Alarm( DEBUG, "{Proc ID: %d, Time: %d, Index: %d}",
2353 g.memb_id.proc_id, g.memb_id.time, g.index );
2354 }
2355