1 /*
2  * The Spread Toolkit.
3  *
4  * The contents of this file are subject to the Spread Open-Source
5  * License, Version 1.0 (the ``License''); you may not use
6  * this file except in compliance with the License.  You may obtain a
7  * copy of the License at:
8  *
9  * http://www.spread.org/license/
10  *
11  * or in the file ``license.txt'' found in this distribution.
12  *
13  * Software distributed under the License is distributed on an AS IS basis,
14  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15  * for the specific language governing rights and limitations under the
16  * License.
17  *
18  * The Creators of Spread are:
19  *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20  *
21  *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22  *
23  *  All Rights Reserved.
24  *
25  * Major Contributor(s):
26  * ---------------
27  *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28  *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
29  *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
30  *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
31  *
32  */
33 
34 
35 #include <string.h>
36 #include <stdio.h>
37 #include <assert.h>
38 #include "membership.h"
39 #include "spread_params.h"
40 #include "flow_control.h"
41 #include "prot_body.h"
42 #include "net_types.h"
43 #include "network.h"
44 #include "objects.h"
45 #include "memory.h"
46 #include "status.h"
47 #include "alarm.h"
48 
49 #define		POTENTIAL_REP	0
50 #define		SEG_REP		1
51 #define		RING_REP	2
52 
53 typedef	struct	dummy_members_info{
54 	int16		num_members;
55 	int16		num_pending;
56 	int32		members[MAX_PROCS_RING];
57 } members_info;
58 
59 typedef	struct	dummy_rep_info{
60 	int32		proc_id;
61 	int16		type;
62 	int16		seg_index;
63 } rep_info;
64 
65 typedef	struct	dummy_reps_info{
66 	int16		num_reps;
67 	int16		rep_index;
68 	rep_info	reps[MAX_REPS];
69 } reps_info;
70 
71 typedef struct	dummy_ring_info{
72 	membership_id	memb_id;
73         int32           trans_time;
74 	int32		aru;
75 	int32		highest_seq;
76 	int32		num_holes;
77 	int16		num_commit;
78 	int16		num_trans;
79 } ring_info;
80 
81 static	configuration	Membership;
82 static	membership_id	Membership_id;
83 static	configuration	Future_membership;
84 static	membership_id	Future_membership_id;
85 
86 static  membership_id   Trans_memb_id;
87 static  int32           F_trans_memb_time;
88 static  int32           Last_time_used;
89 
90 static	int		State;
91 static	int		Token_alive;
92 static  proc            My;
93 static	segment		My_conf_seg;
94 static	int32		My_seg_rep;
95 static	int		Foreign_found;
96 static	configuration	Cn;
97 
98 static	members_info	F_members;
99 static	reps_info	F_reps;
100 static	reps_info	Potential_reps;
101 
102 static	members_info	Commit_set;
103 static	members_info	Future_commit_set;
104 
105 static	sys_scatter	Send_pack;
106 
107 static	sp_time		Zero_timeout 	= {  0, 0};
108 
109 static	void	Memb_handle_alive  ( sys_scatter *scat );
110 static	void	Memb_handle_join   ( sys_scatter *scat );
111 static	void	Memb_handle_refer  ( sys_scatter *scat );
112 static	void	Memb_handle_foreign( sys_scatter *scat );
113 static	void	Memb_handle_form1  ( sys_scatter *scat );
114 static	void	Memb_handle_form2  ( sys_scatter *scat );
115 static	void	Shift_to_seg();
116 static	void	Gather_or_represented();
117 static	void	Shift_to_gather();
118 static	void	Shift_to_represented();
119 static	void	Form_or_fail();
120 static	void	Scast_alive();
121 static	void	Send_join();
122 static	void 	Lookup_new_members();
123 static	int	Insert_member( members_info *m, int32 proc_id );
124 static	int	Insert_rep( reps_info *r, rep_info rep );
125 static	int32	Smallest_member( members_info *m, int *index );
126 static	int32	Smallest_rep( reps_info *r, int *index );
127 static	void	Sort_members( members_info *m );
128 static	void	Sort_reps( reps_info *r );
129 static	void	Create_form1();
130 static	void	Fill_form1( sys_scatter *scat );
131 static	void	Read_form2( sys_scatter *scat );
132 static	void	Backoff_membership();
133 static	void	Flip_members( members_info *members_ptr );
134 static	void	Flip_reps( reps_info *reps_ptr );
135 static	void	Flip_rings( char *buf );
136 
Memb_init()137 void		Memb_init()
138 {
139 	packet_header	*pack_ptr;
140 	int32		reference_subnet;
141 	int32		current_subnet;
142 	int		i;
143 
144 	State = OP;
145 	GlobalStatus.state = OP;
146 	GlobalStatus.membership_changes = 0;
147 
148 	My = Conf_my();
149 	Cn = Conf();
150 
151 	reference_subnet = Cn.segments[0].procs[0]->id;
152 	reference_subnet = reference_subnet & 0xffff0000;
153 
154 	Wide_network = 0;
155 	for( i=1; i < Cn.num_segments; i++ )
156 	{
157 		current_subnet = Cn.segments[i].procs[0]->id;
158 		current_subnet = current_subnet & 0xffff0000;
159 		if( current_subnet != reference_subnet )
160 		{
161 			Wide_network = 1;
162 			break;
163 		}
164 	}
165 
166 	if( Wide_network )
167 	{
168 		Token_timeout.sec  =  20; Token_timeout.usec  = 0;
169 		Hurry_timeout.sec  =   6; Hurry_timeout.usec  = 0;
170 
171 		Alive_timeout.sec  =   1; Alive_timeout.usec  = 0;
172 		Join_timeout.sec   =   1; Join_timeout.usec   = 0;
173 		Rep_timeout.sec    =   5; Rep_timeout.usec    = 0;
174 		Seg_timeout.sec    =   2; Seg_timeout.usec    = 0;
175 		Gather_timeout.sec =  10; Gather_timeout.usec = 0;
176 		Form_timeout.sec   =  10; Form_timeout.usec   = 0;
177 		Lookup_timeout.sec =  90; Lookup_timeout.usec = 0;
178 	}else{
179 		Token_timeout.sec  =   5; Token_timeout.usec  = 0;
180 		Hurry_timeout.sec  =   2; Hurry_timeout.usec  = 0;
181 
182 		Alive_timeout.sec  =   1; Alive_timeout.usec  = 0;
183 		Join_timeout.sec   =   1; Join_timeout.usec   = 0;
184 		Rep_timeout.sec    =   2; Rep_timeout.usec    = 500000;
185 		Seg_timeout.sec    =   2; Seg_timeout.usec    = 0;
186 		Gather_timeout.sec =   5; Gather_timeout.usec = 0;
187 		Form_timeout.sec   =   5; Form_timeout.usec   = 0;
188 		Lookup_timeout.sec =  60; Lookup_timeout.usec = 0;
189 	}
190 
191         /* Lookup timeout when only one segment exists can be longer,
192          * since a no remote segments need to be probed
193          */
194         if ( Cn.num_segments == 1 )
195             Lookup_timeout.sec = 300;
196 
197 	Membership = Conf();
198 	My_conf_seg = Cn.segments[My.seg_index];
199 	for( i=0; i < Conf().num_segments; i++ )
200 		Membership.segments[i].num_procs = 0;
201         Conf_append_id_to_seg( &Membership.segments[My.seg_index], My.id);
202 	Membership_id.proc_id = My.id;
203 	Membership_id.time    = -1;
204         Last_time_used        = Membership_id.time;
205 	Transitional	      = 0;
206 	Reg_membership    = Membership;
207 
208 	pack_ptr = new(PACK_HEAD_OBJ);
209 	pack_ptr->proc_id = My.id;
210 
211 	Send_pack.elements[0].len = sizeof(packet_header);
212 	Send_pack.elements[0].buf = (char *)pack_ptr;
213 
214 	Memb_token_loss();
215 }
216 
Memb_active_ptr()217 configuration	*Memb_active_ptr()
218 {
219 	if( State == EVS ) return ( &Future_membership );
220 	else return( &Membership );
221 }
222 
Memb_id()223 membership_id	Memb_id()
224 {
225 	return( Membership_id );
226 }
227 
Memb_trans_id()228 membership_id   Memb_trans_id()
229 {
230         return( Trans_memb_id );
231 }
232 
Memb_is_equal(membership_id m1,membership_id m2)233 int	Memb_is_equal( membership_id m1, membership_id m2 )
234 {
235 	if( m1.proc_id == m2.proc_id && m1.time == m2.time )
236 		return( 1 );
237 	else
238 		return( 0 );
239 }
240 
Memb_state()241 int32	Memb_state()
242 {
243 	return( State );
244 }
245 
Memb_token_alive()246 int	Memb_token_alive()
247 {
248 	return( Token_alive );
249 }
250 
Memb_handle_message(sys_scatter * scat)251 void	Memb_handle_message( sys_scatter *scat )
252 {
253 	packet_header	*pack_ptr;
254 
255 	pack_ptr = (packet_header *)scat->elements[0].buf;
256 	if( 	  Is_alive( pack_ptr->type ) )
257 	{
258 		Alarm( MEMB, "Memb_handle_message: handling alive message\n");
259 		Memb_handle_alive( scat );
260 
261 	}else if( Is_join( pack_ptr->type ) ){
262 		Alarm( MEMB, "Memb_handle_message: handling join message from %d, State is %d\n", pack_ptr->proc_id, State);
263 		Memb_handle_join( scat );
264 
265 	}else if( Is_refer( pack_ptr->type ) ){
266 		Alarm( MEMB, "Memb_handle_message: handling refer message from %d, State is %d\n",pack_ptr->proc_id, State);
267 		Memb_handle_refer( scat );
268 
269 	}else if( Is_regular( pack_ptr->type ) ){
270 		Alarm( NONE, "Memb_handle_message: handling foreign message from %d, State is %d\n",pack_ptr->proc_id, State);
271 		Memb_handle_foreign( scat );
272 
273 	}else{
274 		Alarm( EXIT, "Memb_handle_message: unknown message type %d\n",
275 			pack_ptr->type );
276 
277 	}
278 }
279 
Memb_handle_token(sys_scatter * scat)280 void	Memb_handle_token( sys_scatter *scat )
281 {
282 	token_header	*token_ptr;
283 
284 	token_ptr = (token_header *)scat->elements[0].buf;
285 	if( 	  Is_form1( token_ptr->type ) )
286 	{
287 		Alarm( MEMB, "Memb_handle_token: handling form1 token\n");
288 		Memb_handle_form1( scat );
289 
290 	}else if( Is_form2( token_ptr->type ) ){
291 		Alarm( MEMB, "Memb_handle_token: handling form2 token\n");
292 		Memb_handle_form2( scat );
293 
294 	}else{
295 		Alarm( EXIT, "Memb_handle_token: unknown token type %d\n",
296 			token_ptr->type );
297 
298 	}
299 }
300 
Memb_handle_alive(sys_scatter * scat)301 static	void	Memb_handle_alive( sys_scatter *scat )
302 {
303 	packet_header	*pack_ptr;
304 
305     pack_ptr 	= (packet_header *)scat->elements[0].buf;
306 
307     switch( State )
308     {
309 	case OP:
310 	    Alarm( MEMB, "Handle_alive in OP\n");
311 	    if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) != -1 )
312 	    {
313 		/* sender belongs to my ring - my token is lost */
314 		Memb_token_loss();
315 		/* update my F_members list */
316 		Insert_member( &F_members, pack_ptr->proc_id );
317 	    }
318 	    break;
319 
320 	case SEG:
321 	    Alarm( MEMB, "Handle_alive in SEG\n");
322 	    /* update my F_members list */
323 	    Insert_member( &F_members, pack_ptr->proc_id );
324 
325 	    break;
326 
327 	case REPRESENTED:
328 	    Alarm( MEMB, "Handle_alive in REPRESENTED\n");
329 	    E_queue( Shift_to_seg, 0, NULL, Rep_timeout );
330 
331 	    break;
332 
333 	case GATHER:
334 	    Alarm( MEMB, "Handle_alive in GATHER\n");
335 	    /*
336 	     * If I am a seg representative - take this guy
337 	     * If I am a ring leader: if this guy in my ring - lost token
338 	     *			      otherwise ignore.
339 	     */
340 	    if( ! Token_alive ) Insert_member( &F_members, pack_ptr->proc_id );
341 	    else if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) != -1 ){
342 		/* sender belongs to my ring - my token is lost */
343 		Memb_token_loss();
344 		/* update my F_members list */
345 		Insert_member( &F_members, pack_ptr->proc_id );
346 	    }
347 	    break;
348 
349 	case FORM:
350 	    /*
351 	     * Ignore.
352 	     * In principle if this guy is in my formed membership
353 	     * then I know I lost the token, but there is no way
354 	     * to know at this time if this guy is in my formed memb.
355 	     */
356 
357 	    break;
358 
359 	case EVS:
360 	    /*
361 	     * If this guy in my formed membership then my token is lost
362 	     * Otherwise ignore
363 	     */
364 	    if( Conf_id_in_conf( &Future_membership, pack_ptr->proc_id ) != -1 )
365 	    {
366 		Memb_token_loss();
367 		/* update my F_members list */
368 		Insert_member( &F_members, pack_ptr->proc_id );
369 	    }
370 	    break;
371 	}
372 }
373 
Memb_handle_join(sys_scatter * scat)374 static	void	Memb_handle_join( sys_scatter *scat )
375 {
376 	packet_header	*pack_ptr;
377 	members_info	*members_ptr;
378 	reps_info	*reps_ptr;
379 	packet_header	refer_pack;
380 	sys_scatter	send_scat;
381 	proc		p;
382 	int		i;
383 	int		ret;
384 	int		dummy;
385 
386     pack_ptr 	= (packet_header *) scat->elements[0].buf;
387     members_ptr = (members_info  *) scat->elements[1].buf;
388 
389     if( !Same_endian( pack_ptr->type ) ) Flip_members( members_ptr );
390 
391     i =	2*sizeof(int16) + (members_ptr->num_members)*sizeof(int32);
392     reps_ptr	= (reps_info     *)&scat->elements[1].buf[i];
393 
394     if( !Same_endian( pack_ptr->type ) ) Flip_reps( reps_ptr );
395 
396     switch( State )
397     {
398 	case OP:
399 	    /* if sender belongs to my ring then my token is lost (except for single segment confs)
400 	       otherwise if I am a ring leader shift_to_gather */
401 	    Alarm( MEMB, "Handle_join in OP\n");
402 	    if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) == -1 )
403 	    {
404 		if( Conf_leader( &Membership ) == My.id )
405 		{
406 			Potential_reps.reps[0] = reps_ptr->reps[0];
407 			Potential_reps.num_reps = 1;
408 			F_members.num_members = 0;
409 			F_members.num_pending = 0;
410 			Shift_to_gather();
411 		}else{
412 			/* if !from my_seg OR seg_leader - answer ref */
413 			if( reps_ptr->reps[0].seg_index != My.seg_index ||
414 			    Conf_seg_leader( &Membership, My.seg_index) == My.id )
415 			{
416 			    refer_pack.type            = REFER_TYPE;
417 			    refer_pack.proc_id         = My.id;
418 			    refer_pack.memb_id.proc_id = Conf_leader( &Membership );
419 			    refer_pack.memb_id.time    = RING_REP;
420 			    refer_pack.data_len	       = 0;
421 			    send_scat.num_elements     = 1;
422 			    send_scat.elements[0].buf  = (char *)&refer_pack;
423 			    send_scat.elements[0].len  = sizeof(packet_header);
424 			    Net_ucast( pack_ptr->proc_id, &send_scat );
425 			}
426 		}
427 	    }else{
428 	        /* sender belongs to my ring - my token is lost */
429                 /* If only one segment exists, then token is not lost, but rather
430                  * a JOIN probe for merged segments was received. This can be
431                  * ignored because a real token loss will trigger an ALIVE broadcast
432                  * packet which will force the membership change.
433                  */
434                 if ( Cn.num_segments > 1 )
435                     Memb_token_loss();
436 	    }
437 	    break;
438 
439 	case SEG:
440 	    ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
441 	    if( My.seg_index == p.seg_index &&
442 		reps_ptr->reps[0].type == SEG_REP)
443 	    {
444 		/* this guy is my representative */
445 		My_seg_rep = pack_ptr->proc_id;
446 		Shift_to_represented();
447 		for( i=0; i < members_ptr->num_members; i++ )
448 		    if( members_ptr->members[i] == My.id ) break;
449 		Scast_alive( 1 );
450 	    }else{
451 		/* no need to remember this join */
452 	    }
453 	    break;
454 
455 	case REPRESENTED:
456 	    ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
457 	    if( My.seg_index == p.seg_index &&
458 		reps_ptr->reps[0].type == SEG_REP)
459 	    {
460 		if( pack_ptr->proc_id != My_seg_rep )
461 		{
462 		    Alarm( MEMB,
463 			"Memb_handle_join: representative shifts: old %d, new %d\n",
464 			My_seg_rep, pack_ptr->proc_id );
465 		    My_seg_rep = pack_ptr->proc_id;
466 		}
467 		E_queue( Shift_to_seg, 0, NULL, Rep_timeout );
468 		for( i=0; i < members_ptr->num_members; i++ )
469 		    if( members_ptr->members[i] == My.id ) break;
470 		Scast_alive( 1 );
471 	    }else{
472 		/* if My_seg_rep is determined -  send it to this guy */
473 		/* My_seg_rep can be undetermined if it did not issue a join yet */
474 		if( My_seg_rep != -1 )
475 		{
476 			/* send my_seg_rep to pack_ptr->proc_id */
477 			refer_pack.type            = REFER_TYPE;
478 			refer_pack.proc_id         = My.id;
479 			refer_pack.memb_id.proc_id = My_seg_rep;
480 			refer_pack.memb_id.time    = SEG_REP;
481 			refer_pack.data_len	   = 0;
482 			send_scat.num_elements     = 1;
483 			send_scat.elements[0].buf  = (char *)&refer_pack;
484 			send_scat.elements[0].len  = sizeof(packet_header);
485 			Net_ucast( pack_ptr->proc_id, &send_scat );
486 		}
487 	    }
488 	    break;
489 
490 	case GATHER:
491 	    /* update my F_reps list */
492 	    Insert_rep( &F_reps, reps_ptr->reps[0] );
493 	    for( i=0; i < reps_ptr->num_reps; i++ )
494 	    {
495 		Insert_rep( &Potential_reps, reps_ptr->reps[i] );
496 	    }
497 	    /* if sender is my Smallest rep - advance Gather_timeout */
498 	    if( Smallest_rep( &F_reps, &dummy ) ==  reps_ptr->reps[0].proc_id )
499 		E_queue( Form_or_fail, 0, NULL, Gather_timeout );
500 
501 	    break;
502 
503 	case FORM:
504 	    /* Ignore */
505 
506 	    break;
507 
508 	case EVS:
509 	    /*
510 	     * If this guy in my formed membership then my token is lost
511 	     * Otherwise ignore
512 	     */
513 	    if( Conf_id_in_conf( &Future_membership, pack_ptr->proc_id ) != -1 )
514 	    {
515 		Memb_token_loss();
516 		/* update my F_members list */
517 		Insert_member( &F_members, pack_ptr->proc_id );
518 	    }
519 	    break;
520 	}
521 }
522 
Memb_handle_refer(sys_scatter * scat)523 static	void	Memb_handle_refer( sys_scatter *scat )
524 {
525 	packet_header	*pack_ptr;
526 	proc		p;
527 	rep_info	temp_rep;
528 	int		ret;
529 
530     pack_ptr 	= (packet_header *) scat->elements[0].buf;
531 
532     switch( State )
533     {
534 	case OP:
535 	    Alarm( MEMB, "Handle_refer in OP\n");
536 		break;
537 
538 	case SEG:
539 	    Alarm( MEMB, "Handle_refer in SEG\n");
540 		break;
541 
542 	case REPRESENTED:
543 	    Alarm( MEMB, "Handle_refer in REPRESENTED\n");
544 		break;
545 
546 	case GATHER:
547 	    Alarm( MEMB, "Handle_refer in GATHER\n");
548 		ret = Conf_proc_by_id( pack_ptr->memb_id.proc_id, &p );
549 		if( ret < 0 )
550 		{
551 			Alarm( PRINT, "Memb_handle_refer: unknown proc_id %d\n",
552 				pack_ptr->memb_id.proc_id );
553 			return;
554 		}
555 		temp_rep.proc_id   = p.id;
556 		temp_rep.type      = pack_ptr->memb_id.time;
557 		temp_rep.seg_index = p.seg_index;
558 		Insert_rep( &Potential_reps, temp_rep );
559 		break;
560 
561 	case FORM:
562 	    Alarm( MEMB, "Handle_refer in FORM\n");
563 		break;
564 
565 	case EVS:
566 	    Alarm( MEMB, "Handle_refer in EVS\n");
567 		break;
568 
569     }
570 }
571 
Memb_handle_foreign(sys_scatter * scat)572 static	void	Memb_handle_foreign( sys_scatter *scat )
573 {
574 	packet_header	*pack_ptr;
575 	proc		p;
576 	int		ret;
577 
578     pack_ptr 	= (packet_header *) scat->elements[0].buf;
579 
580     switch( State )
581     {
582 	case OP:
583 	    Alarm( NONE, "Handle_foreign in OP\n");
584 		ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
585 		if( ret < 0 )
586 		{
587 			Alarm( PRINT, "Memb_handle_foreign: unknown proc_id %d\n",
588 				pack_ptr->proc_id );
589 			return;
590 		}
591 		if( Conf_leader( &Membership ) == My.id )
592 		{
593 			Lookup_new_members();
594 		}else if( Conf_seg_leader( &Membership, My.seg_index ) == My.id &&
595 			  (!Foreign_found) ){
596 			/*
597 			 * Seg leader : sending one foreign message
598 			 * to Conf leader
599 			 */
600 			sys_scatter		send_scat;
601 
602 			Foreign_found = 1;
603 			send_scat.num_elements = 2;
604 			send_scat.elements[0].buf = scat->elements[0].buf;
605 			send_scat.elements[0].len = scat->elements[0].len;
606 			send_scat.elements[1].buf = scat->elements[1].buf;
607 			send_scat.elements[1].len = pack_ptr->data_len;
608 
609 			Net_ucast( Conf_leader( &Membership ), &send_scat );
610 			Net_ucast( Conf_leader( &Membership ), &send_scat );
611 		}
612 		break;
613 
614 	case SEG:
615 	    Alarm( NONE, "Handle_foreign in SEG\n");
616 		break;
617 
618 	case REPRESENTED:
619 	    Alarm( NONE, "Handle_foreign in REPRESENTED\n");
620 		break;
621 
622 	case GATHER:
623 	    Alarm( NONE, "Handle_foreign in GATHER\n");
624 		break;
625 
626 	case FORM:
627 	    Alarm( NONE, "Handle_foreign in FORM\n");
628 		break;
629 
630 	case EVS:
631 	    Alarm( NONE, "Handle_foreign in EVS\n");
632 		/*
633 		 * This may happen when multiple old memberships merge
634 		 * and a message is retransmitted using bcast or scast.
635 		 * i.e. more than one proc miss it and another proc recieves it
636 		 */
637 		break;
638 
639     }
640 }
641 
Memb_handle_form1(sys_scatter * scat)642 static	void	Memb_handle_form1( sys_scatter *scat )
643 {
644     switch( State )
645     {
646 	case OP:
647 	    Alarm( MEMB, "Handle_form1 in OP\n");
648 		if( Conf_leader( &Membership ) == My.id ) /* do nothing */;
649 		else Fill_form1( scat );
650 		break;
651 
652 	case SEG:
653 	    Alarm( MEMB, "Handle_form1 in SEG\n");
654 		/* swallow this token */
655 		break;
656 
657 	case REPRESENTED:
658 	    Alarm( MEMB, "Handle_form1 in REPRESENTED\n");
659 		Fill_form1( scat );
660 		break;
661 
662 	case GATHER:
663 	    Alarm( MEMB, "Handle_form1 in GATHER\n");
664 		Fill_form1( scat );
665 		break;
666 
667 	case FORM:
668 	    Alarm( MEMB, "Handle_form1 in FORM\n");
669 		/* swallow this token */
670 		break;
671 
672 	case EVS:
673 	    Alarm( MEMB, "Handle_form1 in EVS\n");
674 		/* swallow this token */
675 		break;
676 
677     }
678 }
679 
Memb_handle_form2(sys_scatter * scat)680 static	void	Memb_handle_form2( sys_scatter *scat )
681 {
682     switch( State )
683     {
684 	case OP:
685 		Alarm( MEMB, "Handle_form2 in OP\n");
686 		/* swallow this token */
687 		break;
688 
689 	case SEG:
690 		Alarm( MEMB, "Handle_form2 in SEG\n");
691 		/* swallow this token */
692 		break;
693 
694 	case REPRESENTED:
695 		Alarm( MEMB, "Handle_form2 in REPRESENTED\n");
696 		/* swallow this token */
697 		break;
698 
699 	case GATHER:
700 		Alarm( MEMB, "Handle_form2 in GATHER\n");
701 		/* swallow this token */
702 		break;
703 
704 	case FORM:
705 		Alarm( MEMB, "Handle_form2 in FORM\n");
706 		Read_form2( scat );
707 		break;
708 
709 	case EVS:
710 		Alarm( MEMB, "Handle_form2 in EVS\n");
711 		/* swallow this token */
712 		break;
713     }
714 }
715 
Memb_token_loss()716 void	Memb_token_loss()
717 {
718 	rep_info	temp_rep;
719 	int		i;
720 
721     switch( State )
722     {
723 	case OP:
724 	    /* my token is lost - shift to seg */
725 
726 	    Commit_set.num_members = 1;
727 	    Commit_set.members[0] = My.id;
728 
729 	    Potential_reps.num_reps = 0;
730 	    temp_rep.type = POTENTIAL_REP;
731 	    for( i=0; i < Conf().num_segments; i++ )
732 	    {
733 		if( i == My.seg_index ) continue;
734 		temp_rep.seg_index = i;
735 		if( Membership.segments[i].num_procs > 0 )
736 		{
737 			temp_rep.proc_id = Membership.segments[i].procs[0]->id;
738 		}else{
739 			temp_rep.proc_id = Cn.segments[i].procs[0]->id;
740 		}
741 		Insert_rep( &Potential_reps, temp_rep );
742 	    }
743 	    break;
744 
745 	case SEG:
746 	case REPRESENTED:
747 	    Alarm( EXIT, "Memb_token_loss: bug !!! state is %d\n",State);
748 
749 	    break;
750 
751 	case GATHER:
752 	    /* I think I totally solved it */
753 	    /* If I am not a ring leader it is a bug */
754 	    if( ! Token_alive )
755 		Alarm( EXIT, "Memb_token_loss: bug !!! state is %d\n",State);
756 
757 	    /* I am a ring leader and I lost my ring */
758 	    Commit_set.num_members = 1;
759 	    Commit_set.members[0] = My.id;
760 
761 	    Potential_reps.num_reps = 0;
762 	    temp_rep.type = POTENTIAL_REP;
763 
764 	    /* First adding reps that I gathered - they are more important */
765 	    /* starting from 1 not to include myself!! */
766 	    for( i=1; i < F_reps.num_reps; i++ )
767 	    {
768 		temp_rep = F_reps.reps[i];
769 		Insert_rep( &Potential_reps, temp_rep );
770 	    }
771 
772 	    /* Next adding reps from my membership - they are less important */
773 	    for( i=0; i < Conf().num_segments; i++ )
774 	    {
775 		if( i == My.seg_index ) continue;
776 		temp_rep.seg_index = i;
777 		if( Membership.segments[i].num_procs > 0 )
778 		{
779 			temp_rep.proc_id = Membership.segments[i].procs[0]->id;
780 		}else{
781 			temp_rep.proc_id = Cn.segments[i].procs[0]->id;
782 		}
783 		Insert_rep( &Potential_reps, temp_rep );
784 	    }
785 	    break;
786 
787 	case FORM:
788 	    /* my token is lost - shift to seg */
789 	    /* potential already updated according to form token */
790 	    break;
791 
792 	case EVS:
793 	    /* my token is lost - shift to seg */
794 	    /* clear empty messages */
795 	    Backoff_membership();
796 
797 	    /* update potential according to future_membership */
798 	    Potential_reps.num_reps = 0;
799 	    temp_rep.type = POTENTIAL_REP;
800 	    for( i=0; i < Conf().num_segments; i++ )
801 	    {
802 		if( i == My.seg_index ) continue;
803 		temp_rep.seg_index = i;
804 		if( Future_membership.segments[i].num_procs > 0 )
805 		{
806 			temp_rep.proc_id =
807 				Future_membership.segments[i].procs[0]->id;
808 		}else{
809 			temp_rep.proc_id = Cn.segments[i].procs[0]->id;
810 		}
811 		Insert_rep( &Potential_reps, temp_rep );
812 	    }
813 	    break;
814     }
815     Alarm( MEMB, "Memb_token_loss: I lost my token, state is %d\n",State);
816     Shift_to_seg();
817 
818     Token_alive = 0;
819     E_dequeue( Memb_token_loss,    0, NULL );
820     E_dequeue( Send_join,	   0, NULL );
821     E_dequeue( Form_or_fail,	   0, NULL );
822     E_dequeue( Prot_token_hurry,   0, NULL );
823     E_dequeue( Lookup_new_members, 0, NULL );
824     Last_token->type = 0;
825     Last_token->seq  = 0;
826     Last_token->aru  = 0;
827 
828     for( i=0; i < Conf().num_segments; i++ )
829 	Membership.segments[i].num_procs = 0;
830     Conf_append_id_to_seg(&Membership.segments[My.seg_index], My.id);
831 }
832 
Shift_to_seg()833 static	void	Shift_to_seg()
834 {
835 	State = SEG;
836 	GlobalStatus.state = SEG;
837 
838 	F_members.num_members = 1;
839 	F_members.members[0] = My.id;
840 	F_members.num_pending = 0;
841 	E_queue( Scast_alive, 0, NULL, Zero_timeout );
842 	E_queue( Gather_or_represented,  0, NULL, Seg_timeout );
843 }
844 
Gather_or_represented()845 static	void	Gather_or_represented()
846 {
847 	int	dummy;
848 
849 	My_seg_rep = -1;
850 
851 	if( Smallest_member( &F_members, &dummy ) ==  My.id )
852 	{
853 		Shift_to_gather();
854 	}else{
855 		Shift_to_represented();
856 	}
857 }
858 
Shift_to_gather()859 static	void	Shift_to_gather()
860 {
861 	State = GATHER;
862 	GlobalStatus.state = GATHER;
863 
864 	F_reps.num_reps 	 = 1;
865 	F_reps.reps[0].proc_id 	 = My.id;
866 	F_reps.reps[0].seg_index = My.seg_index;
867 	if( Token_alive )
868 		F_reps.reps[0].type = RING_REP;
869 	else	F_reps.reps[0].type = SEG_REP;
870 
871 	E_dequeue( Scast_alive,        0, NULL );
872 	E_dequeue( Lookup_new_members, 0, NULL );
873 	E_queue( Send_join,    0, NULL, Zero_timeout );
874 	E_queue( Form_or_fail, 0, NULL, Gather_timeout );
875 }
876 
Shift_to_represented()877 static	void	Shift_to_represented()
878 {
879 	State = REPRESENTED;
880 	GlobalStatus.state = REPRESENTED;
881 
882 	E_dequeue( Scast_alive, 0, NULL );
883 	E_dequeue( Gather_or_represented, 0, NULL );
884 	E_queue( Shift_to_seg, 0, NULL, Rep_timeout );
885 }
886 
Form_or_fail()887 static	void	Form_or_fail()
888 {
889 
890 	rep_info	temp_rep;
891 	int		i;
892 	int		dummy;
893 
894 	if( Smallest_rep( &F_reps, &dummy ) ==  My.id )
895 	{
896 		if( Token_alive && F_reps.num_reps == 1 )
897 		{
898 			/* clear everything and go back to op */
899 			E_dequeue( Send_join, 0, NULL);
900 			E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
901 			State = OP;
902 			GlobalStatus.state = OP;
903 		}else{
904 			/* create and send form token */
905 			Create_form1();
906 		}
907 	}else{
908 		if( Token_alive )
909 		{
910 			/* clear everything and go back to op */
911 			Alarm( MEMB, "Form_or_fail:failed, return to OP\n");
912 			E_dequeue( Send_join, 0, NULL );
913 			E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
914 			State = OP;
915 			GlobalStatus.state = OP;
916 		}else{
917 			Alarm( MEMB, "Form_or_fail: failed to gather\n");
918 			/* failed to gather again */
919 			F_members.num_members = 1;
920 			F_members.members[0] = My.id;
921 			F_members.num_pending = 0;
922 
923 	    		Potential_reps.num_reps = 0;
924 	    		temp_rep.type = POTENTIAL_REP;
925 			/* starting from 1 not to include myself!! */
926 			for( i=1; i < F_reps.num_reps; i++ )
927 			{
928 				temp_rep = F_reps.reps[i];
929 				Insert_rep( &Potential_reps, temp_rep );
930 			}
931 
932 			Shift_to_gather();
933 		}
934 	}
935 }
936 
Scast_alive(int code,void * dummy)937 static	void	Scast_alive( int code, void *dummy )
938 {
939 	packet_header	*pack_ptr;
940 
941 	Alarm( MEMB, "Scast_alive: State is %d\n", State);
942 
943 	pack_ptr = (packet_header *)Send_pack.elements[0].buf;
944 	pack_ptr->type = ALIVE_TYPE;
945 	pack_ptr->data_len =
946 		2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
947 
948 	Send_pack.elements[1].buf = (char *)&F_members;
949 	Send_pack.elements[1].len = pack_ptr->data_len;
950 	Send_pack.num_elements = 2;
951 	Net_scast( My.seg_index, &Send_pack );
952 	if( code == 0 )
953 		E_queue( Scast_alive, 0, NULL, Alive_timeout );
954 }
955 
Send_join()956 static	void	Send_join()
957 {
958 	packet_header	*pack_ptr;
959 	int		i;
960 
961 	Alarm( MEMB, "Send_join: State is %d\n", State);
962 
963 	pack_ptr = (packet_header *)Send_pack.elements[0].buf;
964 	pack_ptr->type = JOIN_TYPE;
965 	Send_pack.elements[1].buf = (char *)&F_members;
966 	Send_pack.elements[1].len =
967 		2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
968 	Send_pack.elements[2].buf = (char *)&F_reps;
969 	Send_pack.elements[2].len =
970 		2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info);
971 	Send_pack.num_elements = 3;
972 
973 	pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len;
974 
975 	if( !Token_alive )
976 	{
977 		Net_scast( My.seg_index, &Send_pack );
978 	}
979 	for( i=0; i < Potential_reps.num_reps; i++ )
980 	{
981 		Net_ucast( Potential_reps.reps[i].proc_id, &Send_pack );
982 	}
983 
984 	E_queue( Send_join, 0, NULL, Join_timeout );
985 }
986 
Lookup_new_members()987 static	void	Lookup_new_members()
988 {
989 	packet_header	*pack_ptr;
990 	int		num_missing;
991 	int		i,j;
992 
993 	if( State != OP )
994 	{
995 		Alarm( MEMB, "Lookup_new_member: not in OP state, returning\n");
996 		return;
997 	}
998 
999 	Potential_reps.num_reps = 0;
1000 
1001 	F_reps.num_reps		= 1;
1002 	F_reps.reps[0].proc_id	= My.id;
1003 	F_reps.reps[0].seg_index= My.seg_index;
1004 	F_reps.reps[0].type	= RING_REP;
1005 	F_members.num_members = 0;
1006 	F_members.num_pending = 0;
1007 
1008 	pack_ptr = (packet_header *)Send_pack.elements[0].buf;
1009 	pack_ptr->type = JOIN_TYPE;
1010 	Send_pack.elements[1].buf = (char *)&F_members;
1011 	Send_pack.elements[1].len =
1012 		2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
1013 	Send_pack.elements[2].buf = (char *)&F_reps;
1014 	Send_pack.elements[2].len =
1015 		2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info);
1016 	Send_pack.num_elements = 3;
1017 
1018 	pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len;
1019 
1020 	num_missing = 0;
1021         /* For single segment configured, send local broadcast of join to entire segment -- current members will ignore */
1022         if ( Cn.num_segments == 1 ) {
1023             Net_scast( My.seg_index, &Send_pack );
1024             num_missing++;
1025         } else {
1026             /* Send unicasts to each host that is not in the current membership. */
1027             for( i=0; i < Cn.num_segments; i++ )
1028             {
1029 		for( j=0; j < Cn.segments[i].num_procs; j++ )
1030 		{
1031 		    if( Conf_id_in_conf( &Reg_membership, Cn.segments[i].procs[j]->id ) == -1 )
1032 		    {
1033 			Net_ucast( Cn.segments[i].procs[j]->id, &Send_pack );
1034 			num_missing++;
1035 		    }
1036 		}
1037             }
1038         }
1039 	if( num_missing ) Shift_to_gather();
1040 }
1041 
Insert_member(members_info * m,int32 proc_id)1042 static	int	Insert_member( members_info *m, int32 proc_id )
1043 {
1044 	int 	i;
1045 
1046 	for( i=0; i < m->num_members; i++ )
1047 	    if( m->members[i] == proc_id )
1048 	    {
1049 		return( 0 );
1050 	    }
1051 
1052        if (m->num_members == MAX_PROCS_RING)
1053        {
1054             /* members structure is full -- so we ignore this new member */
1055             Alarmp( SPLOG_WARNING, MEMB, "Insert_member: members structure full (%u members) so ignore new member (ID %u.%u.%u.%u)\n", m->num_members, IP1(proc_id), IP2(proc_id), IP3(proc_id), IP4(proc_id) );
1056             return( 0 );
1057         }
1058 	m->members[m->num_members] = proc_id;
1059 	m->num_members++;
1060 	return( 1 );
1061 }
1062 
Insert_rep(reps_info * r,rep_info rep)1063 static	int	Insert_rep( reps_info *r, rep_info rep )
1064 {
1065 	proc	p;
1066 	int	ret;
1067 	int 	i;
1068 
1069 	ret = Conf_proc_by_id( rep.proc_id, &p );
1070 	if( ret < 0 )
1071 		Alarm( EXIT, "Insert_rep: proc %d not exist\n", rep.proc_id );
1072 
1073 	if( rep.type == POTENTIAL_REP )
1074 	{
1075 	    /* when Potential - if there is someone from same segment (of any type RING, SEG, POTENTIAL
1076                - don't insert */
1077 	    for( i=0; i < r->num_reps; i++ )
1078 	    {
1079 		if( r->reps[i].seg_index == p.seg_index )
1080 		{
1081 			return( 0 );
1082 		}
1083 	    }
1084 	}else if( rep.type == SEG_REP ) {
1085 	    /*
1086              * when Seg - if there is other SEG or RING from same segment - keep all.
1087              * if it is POTENTIAL then replace it and exit because no other from this segment can exist.
1088              */
1089 	    for( i=0; i < r->num_reps; i++ )
1090 	    {
1091 		if( r->reps[i].seg_index == p.seg_index )
1092 		{
1093 		    if( r->reps[i].type == POTENTIAL_REP )
1094 		    {
1095 			r->reps[i].type = SEG_REP;
1096 			r->reps[i].proc_id = p.id;
1097 			return( 1 );
1098 		    }else if( r->reps[i].type == SEG_REP ) {
1099                         if (r->reps[i].proc_id == p.id )
1100                                 return( 0 );
1101 		    }else if( r->reps[i].type == RING_REP &&
1102 			      r->reps[i].proc_id == p.id ) {
1103 			/* Former RING_REP lost its ring and became SEG_REP */
1104 		    	r->reps[i].type = SEG_REP;
1105 			return( 1 );
1106 		    }
1107 		}
1108 	    }
1109 	}else if( rep.type == RING_REP ) {
1110  	    /*
1111              * when Ring - if it exists exactly (proc_id) - make it from ring,
1112              * in any case elminate potentials.
1113              * If potential exists, it is known that no ring or seg exists.
1114              */
1115 	    for( i=0; i < r->num_reps; i++ )
1116 	    {
1117                 if( r->reps[i].seg_index == p.seg_index )
1118 		{
1119 		    if( r->reps[i].type == POTENTIAL_REP )
1120 		    {
1121 			r->reps[i].type = RING_REP;
1122 			r->reps[i].proc_id = p.id;
1123 			return( 1 );
1124                     }else if( r->reps[i].type == SEG_REP ) {
1125                         if (r->reps[i].proc_id == p.id ) {
1126                                 r->reps[i].type = RING_REP;
1127                                 return( 1 );
1128                         }
1129 		    }else if( r->reps[i].type == RING_REP &&
1130 			      r->reps[i].proc_id == p.id ) {
1131 			return( 0 );
1132                     }
1133 		}
1134 	    }
1135 	}else Alarm( EXIT, "Insert_rep: unknown rep.type %d \n", rep.type );
1136 
1137         if (r->num_reps == MAX_REPS)
1138         {
1139             /* reps structure is full -- so we ignore this new rep */
1140             Alarmp( SPLOG_WARNING, MEMB, "Insert_rep: reps structure full (%u reps) so ignore new rep (Type %d SegIndex %u ID %u.%u.%u.%u)\n", r->num_reps, rep.type, rep.seg_index, IP1(rep.proc_id), IP2(rep.proc_id), IP3(rep.proc_id), IP4(rep.proc_id) );
1141             return( 0 );
1142         }
1143 	r->reps[r->num_reps] = rep;
1144 	r->num_reps++;
1145 	return( 1 );
1146 }
1147 
Smallest_member(members_info * m,int * index)1148 static	int32	Smallest_member( members_info *m, int *index )
1149 {
1150 	int	current;
1151 	int	i;
1152 	proc	curr_proc, i_proc;
1153 	int	ret;
1154 
1155 	current = 0;
1156 	ret = Conf_proc_by_id( m->members[current], &curr_proc );
1157 	if( ret < 0 ) Alarm( EXIT, "Smallest_member: unknown proc_id %d\n",
1158 				m->members[current] );
1159 	for( i=1; i < m->num_members; i++ )
1160 	{
1161 		ret = Conf_proc_by_id( m->members[i], &i_proc );
1162 		if( ret < 0 ) Alarm( EXIT,
1163 			"Smallest_member: Bug! i: %d, proc %d\n",
1164 			i, m->members[i] );
1165 		if( i_proc.seg_index < curr_proc.seg_index  ||
1166 		    ( i_proc.seg_index   == curr_proc.seg_index &&
1167 		      i_proc.index_in_seg < curr_proc.index_in_seg ) )
1168 		{
1169 			curr_proc = i_proc;
1170 			current = i;
1171 		}
1172 	}
1173 	*index = current;
1174 	return( m->members[current] );
1175 }
1176 
Smallest_rep(reps_info * r,int * index)1177 static	int32	Smallest_rep( reps_info *r, int *index )
1178 {
1179 	int	current;
1180 	int	i;
1181 	proc	curr_p, i_p;
1182 	int	ret;
1183 
1184 	current = 0;
1185 	for( i=1; i < r->num_reps; i++ )
1186 	{
1187 		if( r->reps[current].type == SEG_REP )
1188 		{
1189 		    if( r->reps[i].type == SEG_REP )
1190 		    {
1191 			if( r->reps[i].seg_index < r->reps[current].seg_index )
1192 				current = i;
1193 		    }
1194 		}else if( r->reps[current].type == RING_REP ) {
1195 		    if( r->reps[i].type == RING_REP )
1196 		    {
1197 			if( r->reps[i].seg_index < r->reps[current].seg_index )
1198 				current = i;
1199 			else if( r->reps[i].seg_index == r->reps[current].seg_index )
1200 			{
1201 			    ret = Conf_proc_by_id( r->reps[current].proc_id, &curr_p );
1202 			    ret = Conf_proc_by_id( r->reps[i].proc_id, &i_p );
1203 			    if( i_p.index_in_seg < curr_p.index_in_seg )
1204 				current = i;
1205 			}
1206 		    }else if( r->reps[i].type == SEG_REP ){
1207 			current = i;
1208 		    }
1209 
1210 		}else Alarm( EXIT,
1211 			"Smallest_rep: bug! current index %d is proc %d of type %d\n",
1212 			current,r->reps[current].proc_id,r->reps[current].type );
1213 	}
1214 	*index = current;
1215 	return ( r->reps[current].proc_id );
1216 }
1217 
Sort_members(members_info * m)1218 static	void	Sort_members( members_info *m )
1219 {
1220 	members_info	temp_members;
1221 	int		index;
1222 	int		i;
1223 	int32		dummy;
1224 
1225 	temp_members = *m;
1226 	for( i=0; i < m->num_members; i++ )
1227 	{
1228 		dummy = Smallest_member( &temp_members, &index );
1229 		m->members[i] = temp_members.members[index];
1230 		temp_members.num_members--;
1231 		temp_members.members[index] =
1232 			temp_members.members[temp_members.num_members];
1233 	}
1234 }
1235 
Sort_reps(reps_info * r)1236 static	void	Sort_reps( reps_info *r )
1237 {
1238 	reps_info	temp_reps;
1239 	int		index;
1240 	int		i;
1241 	int32		dummy;
1242 
1243 	temp_reps = *r;
1244 
1245 	for( i=0; i < r->num_reps; i++ )
1246 	{
1247 		dummy = Smallest_rep( &temp_reps, &index );
1248 		r->reps[i] = temp_reps.reps[index];
1249 		temp_reps.num_reps--;
1250 		temp_reps.reps[index] = temp_reps.reps[temp_reps.num_reps];
1251 	}
1252 }
1253 
Create_form1()1254 static	void	Create_form1()
1255 {
1256 	token_header	form_token;
1257 	ring_info	*rg_info;
1258 	int32		*num_rings;
1259 	int32		*holes_procs_ptr;
1260 	int32		index;
1261 	int		pack_entry;
1262 	int		num_bytes;
1263 	sys_scatter	send_scat;
1264 	char		rg_info_buf[sizeof(token_body)];
1265 	rep_info	temp_rep;
1266 	int		i,j;
1267         int             cur_num_members;
1268         members_info    valid_members;
1269 
1270 	form_token.seq = Highest_seq+3333;
1271 	form_token.proc_id = My.id;
1272 	form_token.type = FORM1_TYPE;
1273 
1274 	/* if I am a ring leader - update my F_members */
1275 	if( F_reps.reps[0].type == RING_REP )
1276 	{
1277 		F_members.num_members = 0;
1278 		for( i=0; i < Membership.num_segments; i++ )
1279 		    for( j=0; j < Membership.segments[i].num_procs; j++ )
1280 		    {
1281 			F_members.members[F_members.num_members] =
1282 				Membership.segments[i].procs[j]->id;
1283 			F_members.num_members++;
1284 		    }
1285 	}
1286 
1287         cur_num_members = 0;
1288         for ( i = 0; i < F_members.num_members; i++ ) {
1289             int invalid_member;
1290             invalid_member = 0;
1291 
1292             /* Remove from F_members any members that are also in OUR F_reps (except myself). */
1293             for ( j = 0; j < F_reps.num_reps; j++ ) {
1294                 if ( (F_members.members[i] == F_reps.reps[j].proc_id ) &&
1295                      (F_members.members[i] != My.id) ) {
1296                     invalid_member = 1;
1297                     break;
1298                 }
1299             }
1300             if (!invalid_member) {
1301                 valid_members.members[cur_num_members] = F_members.members[i];
1302                 cur_num_members++;
1303             }
1304         }
1305         memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) );
1306         F_members.num_members = cur_num_members;
1307 
1308 	/* I am the first in F_members. put the rest in pending */
1309 	F_members.num_pending = F_members.num_members - 1;
1310 	F_members.num_members = 1;
1311 
1312 	Sort_reps( &F_reps );
1313 	F_reps.rep_index = 1;
1314 	/* update potential in case of failure */
1315 	Potential_reps.num_reps = 0;
1316 	for( i=0; i < F_reps.num_reps; i++ )
1317 	{
1318 		temp_rep = F_reps.reps[i];
1319 		if( temp_rep.seg_index != My.seg_index )
1320 		{
1321 			temp_rep.type = POTENTIAL_REP;
1322 			Insert_rep( &Potential_reps, temp_rep );
1323 		}
1324 	}
1325 
1326 	num_bytes = 0;
1327 	num_rings = (int32 *)rg_info_buf;
1328 	*num_rings= 1;
1329 	num_bytes += sizeof(int32);
1330 	rg_info	  = (ring_info *)&rg_info_buf[num_bytes];
1331 	num_bytes += sizeof(ring_info);
1332 	holes_procs_ptr = (int32 *)&rg_info_buf[num_bytes];
1333 
1334 	rg_info->memb_id	= Membership_id;
1335         rg_info->trans_time     = 0;
1336 	rg_info->aru		= Aru;
1337 	rg_info->highest_seq	= Highest_seq;
1338 
1339 	/* update holes */
1340 	rg_info->num_holes	= 0;
1341 	for( index = My_aru+1; index <= Highest_seq; index++ )
1342 	{
1343 	    pack_entry = index & PACKET_MASK;
1344 	    if( ! Packets[pack_entry].exist )
1345 	    {
1346 		*holes_procs_ptr = index;
1347 		Alarm( MEMB ,
1348 		    "INSERT HOLE 1 IS %d My_aru is %d, Highest_seq is %d\n",
1349 		    index,My_aru, Highest_seq);
1350 		holes_procs_ptr++;
1351 		num_bytes += sizeof(int32);
1352 		rg_info->num_holes++;
1353 	    }
1354 	}
1355 
1356 	/* update commit-trans procs */
1357 
1358 	/* insert self in trans and commit */
1359 	rg_info->num_commit	= 1;
1360 	rg_info->num_trans	= 1;
1361 	*holes_procs_ptr = My.id;
1362 	num_bytes += sizeof(int32);
1363 	holes_procs_ptr++;
1364 
1365 	/* insert other members of commit set */
1366 	for( i=0; i < Commit_set.num_members; i++ )
1367 	{
1368 		/* skipping self, because already there */
1369 		if( Commit_set.members[i] == My.id ) continue;
1370 
1371 		/* insert this member */
1372 		*holes_procs_ptr = Commit_set.members[i];
1373 		holes_procs_ptr++;
1374 		num_bytes += sizeof(int32);
1375 		rg_info->num_commit++;
1376 	}
1377 
1378 	send_scat.num_elements = 4;
1379 	send_scat.elements[0].buf = (char *)&form_token;
1380 	send_scat.elements[0].len = sizeof(token_header);
1381 	send_scat.elements[1].buf = (char *)&F_members;
1382 	send_scat.elements[1].len = sizeof(members_info);
1383 	send_scat.elements[2].buf = (char *)&F_reps;
1384 	send_scat.elements[2].len = sizeof(reps_info);
1385 	send_scat.elements[3].buf = rg_info_buf;
1386 	send_scat.elements[3].len = num_bytes;
1387 
1388 	form_token.rtr_len = send_scat.elements[1].len + send_scat.elements[2].len +
1389 			      send_scat.elements[3].len;
1390 
1391 	/* compute whom to send to */
1392 	if( F_members.num_pending > 0 )
1393 	{
1394 		/* send to next member in pending list */
1395 		Net_ucast_token( F_members.members[F_members.num_members],
1396 			   &send_scat );
1397 		Net_ucast_token( F_members.members[F_members.num_members],
1398 			   &send_scat );
1399 	}else if( F_reps.rep_index < F_reps.num_reps){
1400 		/* send to next rep */
1401 		Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );
1402 		Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );
1403 	}else{
1404 		/* singleton membership */
1405 		F_members.num_pending = 1;
1406 		F_members.num_members = 0;
1407 		form_token.type = FORM2_TYPE;
1408 		send_scat.elements[2].len = sizeof(membership_id);
1409 		form_token.rtr_len = send_scat.elements[1].len +
1410 			send_scat.elements[2].len + send_scat.elements[3].len;
1411 		Net_ucast_token( My.id, &send_scat );
1412 	}
1413 
1414 	E_dequeue( Send_join, 0, NULL );
1415 	E_queue( Memb_token_loss, 0, NULL, Form_timeout );
1416     	E_dequeue( Prot_token_hurry, 0, NULL );
1417 	Token_alive = 0;
1418 
1419 	State = FORM;
1420 	GlobalStatus.state = FORM;
1421 }
1422 
Fill_form1(sys_scatter * scat)1423 static	void	Fill_form1( sys_scatter *scat )
1424 {
1425 	sys_scatter	send_scat;
1426 	token_header	*form_token;
1427 	members_info	*m_info;
1428 	reps_info	*r_info;
1429 	ring_info	*old_rg_info, *new_rg_info;
1430 	ring_info	*my_rg_info;
1431 	int32		*old_num_rings, *new_num_rings;
1432 	int32		*my_holes_procs_ptr, *new_holes_procs_ptr;
1433 	int32		index;
1434 	int		pack_entry;
1435 	char		rg_info_buf[sizeof(token_body)];
1436 	char		*c_ptr;
1437 	char		*rings_buf;
1438 	int		num_bytes;
1439 	int		bytes_to_copy;
1440 	rep_info	temp_rep;
1441 	int		i,j,k;
1442 	int             cur_num_members;
1443         int             num_to_copy;
1444         members_info    valid_members;
1445 
1446 	num_bytes  = 0;
1447 
1448 	form_token = (token_header *)scat->elements[0].buf;
1449 
1450 	m_info	   = (members_info *)scat->elements[1].buf;
1451 	num_bytes  += sizeof(members_info);
1452 
1453 	r_info	   = (reps_info    *)&scat->elements[1].buf[num_bytes];
1454 	num_bytes  += sizeof(reps_info);
1455 
1456 	rings_buf  = &scat->elements[1].buf[num_bytes];
1457 	old_num_rings  = (int32        *)&scat->elements[1].buf[num_bytes];
1458 	num_bytes  += sizeof(int32);
1459 
1460 	old_rg_info= (ring_info    *)&scat->elements[1].buf[num_bytes];
1461 
1462 	if( !Same_endian( form_token->type ) )
1463 	{
1464 		Flip_members( m_info );
1465 		Flip_reps( r_info );
1466 		Flip_rings( rings_buf );
1467 	}
1468 
1469 	/* update header */
1470 	form_token->proc_id = My.id;
1471 	if( form_token->seq < Highest_seq+3333 ) form_token->seq = Highest_seq+3333;
1472 
1473 	/* update members and reps */
1474 	if( State == OP || State == REPRESENTED )
1475 	{
1476 		/* validity check */
1477 		if( m_info->members[m_info->num_members] != My.id ||
1478 		    m_info->num_pending <= 0 ) return;
1479 
1480 		m_info->num_members++;
1481 		m_info->num_pending--;
1482 
1483 	}else if( State == GATHER ){
1484 
1485 		/* validity check */
1486 		if( r_info->reps[r_info->rep_index].proc_id != My.id ||
1487 		    m_info->num_pending != 0 )
1488 		{
1489 			return;
1490 		}
1491 
1492                 /*
1493                  * Add validation check to my F_members list. I need to remove members:
1494                  *      1) Any members already in the m_info I just received in form1.
1495                  *      2) Any members of r_info I just received in form1 (except myself).
1496                  *      3) Any members that are also in OUR F_reps (except myself).
1497                  *              The reason for this is that these guys also became seg_reps
1498                  *              from the same segment as me, but did not make it to the final reps list on form1
1499                  *              and if included as members will act as seg_reps which they should not.
1500                  */
1501                 cur_num_members = 0;
1502                 for ( i = 0; i < F_members.num_members; i++ ) {
1503                         int invalid_member;
1504                         invalid_member = 0;
1505 
1506                         /* 1) Any members already in the m_info I just received in form1. */
1507                         for ( j = 0; j < m_info->num_members; j++ ) {
1508                                 if (F_members.members[i] == m_info->members[j] ) {
1509                                         invalid_member = 1;
1510                                         break;
1511                                 }
1512                         }
1513 
1514                         /* 2) Any members of r_info I just received in form1 (except myself). */
1515                         for ( j = 0; !invalid_member && j < r_info->num_reps; j++ ) {
1516                                 if ( (F_members.members[i] == r_info->reps[j].proc_id ) &&
1517                                      (F_members.members[i] != My.id) ) {
1518                                         invalid_member = 1;
1519                                         break;
1520                                 }
1521                         }
1522 
1523                         /* 3) Any members that are also in OUR F_reps (except myself). */
1524                         for ( j = 0; !invalid_member && j < F_reps.num_reps; j++ ) {
1525                                 if ( (F_members.members[i] == F_reps.reps[j].proc_id ) &&
1526                                      (F_members.members[i] != My.id) ) {
1527                                         invalid_member = 1;
1528                                         break;
1529                                 }
1530                         }
1531                         if (!invalid_member) {
1532                                 valid_members.members[cur_num_members] = F_members.members[i];
1533                                 cur_num_members++;
1534                         }
1535                 }
1536                 memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) );
1537                 F_members.num_members = cur_num_members;
1538 
1539                 /* Now add all my members into the form1 token */
1540 		if( r_info->reps[r_info->rep_index].type == SEG_REP )
1541 		{
1542 			/* Fill myself and my members */
1543 			i = m_info->num_members;
1544 			for( j=0; j < F_members.num_members; j++, i++ )
1545 				m_info->members[i] = F_members.members[j];
1546 			m_info->num_pending = F_members.num_members - 1;
1547 			m_info->num_members += 1;
1548 
1549 		}else if( r_info->reps[r_info->rep_index].type == RING_REP ){
1550 
1551 			/* Fill myself and my ring members */
1552 			i = m_info->num_members;
1553 			for( j=0; j < Membership.num_segments; j++ )
1554 			    for( k=0; k < Membership.segments[j].num_procs; k++, i++ )
1555 				m_info->members[i] =
1556 					Membership.segments[j].procs[k]->id;
1557 			m_info->num_pending = i - m_info->num_members -1;
1558 			m_info->num_members += 1;
1559 		}else Alarm( EXIT, "Fill_form1: invalid rep type: %d\n",
1560 			r_info->reps[r_info->rep_index].type );
1561 
1562 		r_info->rep_index++;
1563 
1564 	}else Alarm( EXIT, "Fill_form1: invalid State: %d\n",State );
1565 
1566 	/* update potential in case of failure */
1567 	Potential_reps.num_reps = 0;
1568 	for( i=0; i < r_info->num_reps; i++ )
1569 	{
1570 		temp_rep = r_info->reps[i];
1571 		if( temp_rep.seg_index != My.seg_index )
1572 		{
1573 			temp_rep.type = POTENTIAL_REP;
1574 			Insert_rep( &Potential_reps, temp_rep );
1575 		}
1576 	}
1577 
1578 
1579 	/* update rings info */
1580 	num_bytes = 0;
1581 	new_num_rings = (int32 *)&rg_info_buf[num_bytes];
1582 	*new_num_rings = 0;
1583 	num_bytes += sizeof(int32);
1584 
1585 	my_rg_info = NULL;
1586 	my_holes_procs_ptr = NULL; /* optimiser thinks it may be used
1587 				     uninitialised (its wrong, but it
1588 				     is too subtle for it) */
1589 	for( i=0; i < *old_num_rings; i++ )
1590 	{
1591 	    bytes_to_copy = sizeof(ring_info) +
1592 			( old_rg_info->num_holes + old_rg_info->num_commit )* sizeof(int32);
1593 	    if( Memb_is_equal( old_rg_info->memb_id, Membership_id ) )
1594 	    {
1595 		my_rg_info = old_rg_info;
1596 		c_ptr = (char *) old_rg_info;
1597 		my_holes_procs_ptr = (int32 *)&c_ptr[sizeof(ring_info)];
1598                 old_rg_info = (ring_info *)&c_ptr[bytes_to_copy];
1599 	    }else{
1600 		new_rg_info= (ring_info    *)&rg_info_buf[num_bytes];
1601 		memmove((char *)new_rg_info, (char *)old_rg_info, bytes_to_copy );
1602 		c_ptr = (char *) old_rg_info;
1603 		old_rg_info = (ring_info *)&c_ptr[bytes_to_copy];
1604 		num_bytes   += bytes_to_copy;
1605 		(*new_num_rings)++;
1606 	    }
1607 	}
1608 
1609 	new_rg_info= (ring_info    *)&rg_info_buf[num_bytes];
1610 	num_bytes += sizeof(ring_info);
1611 	(*new_num_rings)++;
1612 
1613 	new_rg_info->memb_id	 = Membership_id;
1614         new_rg_info->trans_time  = 0;
1615 	new_rg_info->num_holes	 = 0;
1616 	new_holes_procs_ptr      = (int32 *)&rg_info_buf[num_bytes];
1617 
1618 	new_rg_info->aru	 = Aru;
1619 	new_rg_info->highest_seq = Highest_seq;
1620 
1621 	if( my_rg_info == NULL )
1622 	{
1623 
1624             if ( *new_num_rings > MAX_FORM_REPS )
1625             {
1626                 /* This ring_info entry will NOT fit in the FORM token packet.
1627                  * So since the ring_info is needed for me (this daemon) to be
1628                  * included in the current membership ring, we will have
1629                  * to remove ourselves from the m_info list and not
1630                  * create this ring_info
1631                  */
1632                 Alarmp( SPLOG_WARNING, MEMB, "Fill_form1: ring_info entry for %u.%u.%u.%u will not fit in FORM token. Removing self from current membership attempt by removing IP from m_info list\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id));
1633                 /* since new ring is always at the end, we just decrease current byte count */
1634                 num_bytes = num_bytes - sizeof(ring_info);
1635                 (*new_num_rings)--;
1636 
1637                 /* Remove ourselves from m_info */
1638                 for ( i=0; i < m_info->num_members; i++)
1639                 {
1640                     if( m_info->members[i] == My.id )
1641                     {
1642                         num_to_copy = m_info->num_members + m_info->num_pending - i - 1;
1643                         memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(int32));
1644                         break;
1645                     }
1646                 }
1647                 m_info->num_members--;
1648 
1649             } else {
1650                 /* New ring_info will fit, so create it */
1651                 for( index = Last_discarded+1; index <= Highest_seq; index++ )
1652                 {
1653                     pack_entry = index & PACKET_MASK;
1654                     if( ! Packets[pack_entry].exist )
1655                     {
1656 			*new_holes_procs_ptr = index;
1657 			Alarm( MEMB , "INSERT HOLE 2 IS %d\n",index);
1658 			new_holes_procs_ptr++;
1659 			num_bytes     += sizeof(int32);
1660 			new_rg_info->num_holes++;
1661                     }
1662                 }
1663 
1664                 /* update commit-trans procs */
1665 
1666                 /* insert self in trans and commit */
1667                 new_rg_info->num_commit	= 1;
1668                 new_rg_info->num_trans	= 1;
1669                 *new_holes_procs_ptr = My.id;
1670                 new_holes_procs_ptr++;
1671                 num_bytes += sizeof(int32);
1672 
1673                 /* insert other members of commit set */
1674                 for( i=0; i < Commit_set.num_members; i++ )
1675                 {
1676                     /* skipping self, because already there */
1677                     if( Commit_set.members[i] == My.id ) continue;
1678 
1679                     /* insert this member */
1680                     *new_holes_procs_ptr = Commit_set.members[i];
1681                     new_holes_procs_ptr++;
1682                     num_bytes += sizeof(int32);
1683                     new_rg_info->num_commit++;
1684                 }
1685             }
1686 	}else{
1687 
1688 	    members_info temp_set;
1689 
1690 	    if( my_rg_info->aru         > Aru )
1691 		new_rg_info->aru 	= my_rg_info->aru;
1692 
1693 	    if( my_rg_info->highest_seq > Highest_seq )
1694 		new_rg_info->highest_seq= my_rg_info->highest_seq;
1695 
1696 	    for( i=0; i < my_rg_info->num_holes; i++ )
1697 	    {
1698 		pack_entry = *my_holes_procs_ptr & PACKET_MASK;
1699 		if( ! Packets[pack_entry].exist )
1700 		{
1701 			*new_holes_procs_ptr	= *my_holes_procs_ptr;
1702 			Alarm( MEMB ,
1703 		"INSERT HOLE 3 IS %d My_aru is %d, Highest_seq is %d\n",
1704 				*new_holes_procs_ptr,My_aru, Highest_seq);
1705 			new_holes_procs_ptr++;
1706 			num_bytes	+= sizeof(int32);
1707 			new_rg_info->num_holes++;
1708 		}
1709 		my_holes_procs_ptr++;
1710 	    }
1711 
1712 	    if( my_rg_info->highest_seq < Highest_seq )
1713             {
1714 		for( index = my_rg_info->highest_seq+1;
1715 			index <= Highest_seq; index++ )
1716 		{
1717 		    pack_entry = index & PACKET_MASK;
1718 		    if( ! Packets[pack_entry].exist )
1719 		    {
1720 			Alarm( MEMB , "INSERT HOLE 4 IS %d\n",index);
1721 			*new_holes_procs_ptr = index;
1722 			new_holes_procs_ptr++;
1723 			num_bytes     += sizeof(int32);
1724 			new_rg_info->num_holes++;
1725 		    }
1726 		}
1727             }
1728 	    /* setting temp_set to be trans members only */
1729 	    temp_set.num_members = 0;
1730 	    for( i = 0; i < my_rg_info->num_trans; i++ )
1731 	    {
1732 		Insert_member( &temp_set, *my_holes_procs_ptr);
1733 		my_holes_procs_ptr++;
1734 	    }
1735 
1736 	    /* creating an updated temp_set based on my_rg_info and Commit_set */
1737 
1738 	    /* adding self to trans members */
1739 	    Insert_member( &temp_set, My.id );
1740 	    if( temp_set.num_members != (my_rg_info->num_trans + 1) )
1741 		Alarm( EXIT, "Fill_form1: incorrect trans set\n");
1742 	    temp_set.num_pending = my_rg_info->num_trans+1;
1743 
1744 	    /* adding rest of original commit set */
1745 	    for( i = my_rg_info->num_trans; i < my_rg_info->num_commit; i++ )
1746 	    {
1747 		Insert_member( &temp_set, *my_holes_procs_ptr );
1748 		my_holes_procs_ptr++;
1749 	    }
1750 
1751 	    /* adding my commit set */
1752 	    for( i = 0; i < Commit_set.num_members; i++ )
1753 	    {
1754 		Insert_member( &temp_set, Commit_set.members[i] );
1755 	    }
1756 
1757 	    /* writing my ring commit and trans information to new_rg_info */
1758 	    new_rg_info->num_commit = temp_set.num_members;
1759 	    new_rg_info->num_trans  = temp_set.num_pending;
1760 	    for( i = 0; i < temp_set.num_members; i++ )
1761 	    {
1762 		*new_holes_procs_ptr = temp_set.members[i];
1763 		new_holes_procs_ptr++;
1764 		num_bytes += sizeof(int32);
1765 	    }
1766 	}
1767 
1768 	send_scat.num_elements = 4;
1769 	send_scat.elements[0].buf = (char *)form_token;
1770 	send_scat.elements[0].len = sizeof(token_header);
1771 	send_scat.elements[1].buf = (char *)m_info;
1772 	send_scat.elements[1].len = sizeof(members_info);
1773 	send_scat.elements[2].buf = (char *)r_info;
1774 	send_scat.elements[2].len = sizeof(reps_info);
1775 	send_scat.elements[3].buf = rg_info_buf;
1776 	send_scat.elements[3].len = num_bytes;
1777 
1778 	form_token->rtr_len = send_scat.elements[1].len + send_scat.elements[2].len +
1779 			      send_scat.elements[3].len;
1780 
1781 	/* compute whom to send to */
1782 	if( m_info->num_pending > 0 )
1783 	{
1784 		/* send to next member in pending list */
1785 		Net_ucast_token( m_info->members[m_info->num_members],
1786 			&send_scat );
1787 		Net_ucast_token( m_info->members[m_info->num_members],
1788 			&send_scat );
1789 	}else if( r_info->rep_index < r_info->num_reps){
1790 		/* send to next rep */
1791 		Net_ucast_token( r_info->reps[r_info->rep_index].proc_id,
1792 			&send_scat );
1793 		Net_ucast_token( r_info->reps[r_info->rep_index].proc_id,
1794 			&send_scat );
1795 	}else{
1796 		/* prepare form2 token */
1797 		Sort_members( m_info );
1798 		m_info->num_pending = m_info->num_members;
1799 		m_info->num_members = 0;
1800 		form_token->type = FORM2_TYPE;
1801 		/* this is the only difference between form1 and form2 tokens */
1802 		send_scat.elements[2].len = sizeof(membership_id);
1803 		form_token->rtr_len = send_scat.elements[1].len +
1804 				send_scat.elements[2].len + send_scat.elements[3].len;
1805 		Net_ucast_token( m_info->members[0], &send_scat );
1806 		Net_ucast_token( m_info->members[0], &send_scat );
1807 	}
1808 
1809 	E_dequeue( Send_join, 0, NULL );
1810 	E_dequeue( Form_or_fail, 0, NULL );
1811 	E_dequeue( Shift_to_seg, 0, NULL );
1812 	E_queue( Memb_token_loss, 0, NULL, Form_timeout );
1813     	E_dequeue( Prot_token_hurry, 0, NULL );
1814 	Token_alive = 0;
1815 
1816 	State = FORM;
1817 	GlobalStatus.state = FORM;
1818 }
1819 
Read_form2(sys_scatter * scat)1820 static	void	Read_form2( sys_scatter *scat )
1821 {
1822 	sys_scatter	send_scat;
1823 	token_header	*form_token;
1824 	members_info	*m_info;
1825 	membership_id	*m_id_info;
1826 	ring_info	*rg_info;
1827 	ring_info	*my_rg_info;
1828 	int32		*my_holes_procs_ptr;
1829 	int32		*num_rings;
1830 	int		pack_entry;
1831 	char		*c_ptr;
1832 	char		*rings_buf;
1833 	int		num_bytes;
1834 	int		bytes_to_skip;
1835 	proc		p;
1836 	int		ret;
1837 	int		i;
1838         int32           memb_time = 0;
1839 
1840 	num_bytes  = 0;
1841 
1842 	form_token = (token_header *)scat->elements[0].buf;
1843 
1844 	m_info	   = (members_info *)scat->elements[1].buf;
1845 	num_bytes  += sizeof(members_info);
1846 
1847 	m_id_info   = (membership_id *)&scat->elements[1].buf[num_bytes];
1848 	num_bytes  += sizeof(membership_id);
1849 
1850 	rings_buf  = &scat->elements[1].buf[num_bytes];
1851 	num_rings  = (int32        *)&scat->elements[1].buf[num_bytes];
1852 	num_bytes  += sizeof(int32);
1853 
1854 	rg_info= (ring_info    *)&scat->elements[1].buf[num_bytes];
1855 
1856 	if( !Same_endian( form_token->type ) )
1857 	{
1858 		Flip_members( m_info );
1859 		m_id_info->proc_id = Flip_int32( m_id_info->proc_id );
1860 		m_id_info->time    = Flip_int32( m_id_info->time );
1861 		Flip_rings( rings_buf );
1862 	}
1863 
1864 	form_token->proc_id = My.id;
1865 
1866 	/* validity check */
1867 	if( m_info->members[m_info->num_members] != My.id ||
1868 	    m_info->num_pending <= 0 ) return;
1869 
1870 	m_info->num_members++;
1871 	m_info->num_pending--;
1872 
1873 	Last_seq = form_token->seq;
1874 
1875 	if( m_info->num_members == 1 )
1876 	{
1877                 /* The time in memb_time is saved to use on one of the
1878                  * Trans_memb_ids (there is one per Trans_membership view).
1879                  * The time for the real membership id will be one past
1880                  * the one in memb_time, because I want time to go forward.
1881                  */
1882                 memb_time = E_get_time().sec;
1883                 if( memb_time <= Last_time_used )
1884                         memb_time = Last_time_used + 1;
1885                 Last_time_used = memb_time;
1886 
1887 		/* I am future leader, fill membership_id */
1888 		m_id_info->proc_id = My.id;
1889 		m_id_info->time    = ++Last_time_used;
1890 	}
1891 
1892 	/* build Future membership and Future membership id */
1893 	Future_membership_id.proc_id = m_id_info->proc_id;
1894 	Future_membership_id.time    = m_id_info->time;
1895 	Future_membership = Conf();
1896 	for( i=0; i < Future_membership.num_segments; i++ )
1897 		Future_membership.segments[i].num_procs = 0;
1898 	for( i=0; i < (m_info->num_members + m_info->num_pending); i++ )
1899 	{
1900 		ret = Conf_proc_by_id( m_info->members[i], &p );
1901 		if( ret < 0 ) Alarm( EXIT, "Read_form2: no such id %u\n",
1902 				m_info->members[i] );
1903                 if ( Conf_append_id_to_seg( &Future_membership.segments[p.seg_index], p.id) == -1)
1904                         Alarm( EXIT, "Read_form2: BUG2 no such id %u\n", p.id);
1905 	}
1906 	Net_set_membership( Future_membership );
1907 	FC_new_configuration( );
1908 
1909 	/* get my ring info */
1910 	my_rg_info = NULL;
1911 	my_holes_procs_ptr = NULL;
1912 	for( i=0; i < *num_rings; i++ )
1913 	{
1914 	    bytes_to_skip = sizeof(ring_info) +
1915 			    ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
1916 	    if( Memb_is_equal( rg_info->memb_id, Membership_id ) )
1917 	    {
1918 		my_rg_info = rg_info;
1919 		my_holes_procs_ptr =
1920 		    (int32 *)&scat->elements[1].buf[num_bytes+sizeof(ring_info)];
1921 	    }
1922 	    c_ptr = (char *) rg_info;
1923 	    rg_info = (ring_info *)&c_ptr[bytes_to_skip];
1924 	    num_bytes   += bytes_to_skip;
1925 	}
1926 
1927         if (my_rg_info == NULL) {
1928                 Alarm(EXIT, "Read_form2: num_rings = %d, num_bytes = %d, Memb_id = (%d %d)\n",
1929                       *num_rings, num_bytes, Membership_id.proc_id, Membership_id.time);
1930         }
1931 
1932 	Highest_seq = my_rg_info->highest_seq;
1933 	Aru	    = my_rg_info->aru;
1934         /* Note: this call to Discard_packets handles delivery of all the messages
1935          *       from the old membership with sequence numbers prior to the old Aru.
1936          */
1937 	Discard_packets();
1938 
1939 	for( i=0; i < my_rg_info->num_holes; i++ )
1940 	{
1941 		/* create dummy messages */
1942 		pack_entry = *my_holes_procs_ptr & PACKET_MASK;
1943 		Alarm( MEMB , "EXTRACT HOLE IS %d\n",*my_holes_procs_ptr);
1944 		if( Packets[pack_entry].exist != 0 )
1945 		    Alarm( EXIT, "Read_form2: seq %d should be a hole, but is %d\n",
1946 			*my_holes_procs_ptr, Packets[pack_entry].exist );
1947 		Packets[pack_entry].exist = 3;
1948 		my_holes_procs_ptr++;
1949 	}
1950 
1951 	/* extract future commit set (and future trans membership) */
1952 	Future_commit_set.num_members = my_rg_info->num_commit;
1953 	Future_commit_set.num_pending = my_rg_info->num_trans;
1954 	for( i=0; i < my_rg_info->num_commit; i++ )
1955 	{
1956 		Future_commit_set.members[i] = *my_holes_procs_ptr;
1957 		my_holes_procs_ptr++;
1958 	}
1959 
1960         /* The token circulates in conf order, which also defines the order
1961          * by which we choose "leaders."  So, if noone else has set the id
1962          * for my ring, I get to, and I'll be leader. */
1963         if( !my_rg_info->trans_time )
1964         {
1965                 /* memb_time could be already set, if this daemon is the
1966                  * future leader */
1967                 if( !memb_time )
1968                 {
1969                         memb_time = E_get_time().sec;
1970                         if( memb_time <= Last_time_used )
1971                                 memb_time = Last_time_used + 1;
1972                         Last_time_used = memb_time;
1973                 }
1974                 my_rg_info->trans_time = memb_time;
1975         }
1976         F_trans_memb_time = my_rg_info->trans_time;
1977 
1978 	send_scat.num_elements = 2;
1979 	send_scat.elements[0].buf = scat->elements[0].buf;
1980 	send_scat.elements[0].len = sizeof(token_header);
1981 	send_scat.elements[1].buf = scat->elements[1].buf;
1982 	send_scat.elements[1].len = num_bytes;
1983 
1984 	if( Conf_last( &Future_membership ) != My.id )
1985 	{
1986 		Net_send_token( &send_scat );
1987 		Net_send_token( &send_scat );
1988 		Token_rounds = 0;
1989 
1990 	}else{
1991 		/* build first regular token */
1992 		send_scat.num_elements = 1;
1993 
1994 		form_token->type = 0;
1995 		form_token->seq = 0;
1996 		form_token->aru = Last_seq;
1997 		form_token->flow_control = 0;
1998 		form_token->rtr_len = 0;
1999 
2000 		Net_send_token( &send_scat );
2001 		Token_rounds = 1;
2002 	}
2003 	Token_alive = 1;
2004 	E_queue( Memb_token_loss, 0, NULL, Token_timeout );
2005 
2006 	Last_token->type = 0;
2007 	Last_token->seq  = 0;
2008 	Last_token->aru  = 0;
2009 
2010 	State = EVS;
2011 	GlobalStatus.state = EVS;
2012 }
2013 
Memb_print_form_token(sys_scatter * scat)2014 void	Memb_print_form_token( sys_scatter *scat )
2015 {
2016 	token_header	*form_token;
2017 	members_info	*m_info;
2018 	reps_info	*r_info = NULL; /* avoids compile warning -- gcc not detect initialization */
2019         membership_id   *m_id_info = NULL; /* avoids compile warning -- gcc not detect initialization */
2020 	ring_info	*rg_info;
2021 	int32		*num_rings;
2022 	int32		*commit_id;
2023 	char		*c_ptr;
2024 	int		num_bytes, bytes_to_skip;
2025 	int		i,j, scat_index;
2026         int             is_form1 = 0;
2027         char            form_name[10];
2028 
2029 	num_bytes  = 0;
2030         scat_index = 1;
2031 	form_token = (token_header *)scat->elements[0].buf;
2032 
2033 	m_info	   = (members_info *)&scat->elements[scat_index].buf[num_bytes];
2034 	num_bytes  += sizeof(members_info);
2035 
2036         if (num_bytes == scat->elements[scat_index].len )
2037         {
2038             num_bytes = 0;
2039             scat_index++;
2040         }
2041 
2042         if ( Is_form1( form_token->type ) )
2043 	{
2044             r_info	= (reps_info    *)&scat->elements[scat_index].buf[num_bytes];
2045             num_bytes  += sizeof(reps_info);
2046             is_form1 = 1;
2047 	} else if( Is_form2( form_token->type ) )
2048         {
2049             m_id_info   = (membership_id *)&scat->elements[scat_index].buf[num_bytes];
2050             num_bytes  += sizeof(membership_id);
2051             is_form1 = 0;
2052         } else {
2053             Alarm( EXIT, "Invalid token type received: 0x%x\n", form_token->type);
2054             return;
2055         }
2056 
2057         if (num_bytes == scat->elements[scat_index].len )
2058         {
2059             num_bytes = 0;
2060             scat_index++;
2061         }
2062 
2063 	num_rings  = (int32        *)&scat->elements[scat_index].buf[num_bytes];
2064 	num_bytes  += sizeof(int32);
2065 
2066 	rg_info= (ring_info    *)&scat->elements[scat_index].buf[num_bytes];
2067 
2068         /* Print form_token_header */
2069 
2070         Alarmp( SPLOG_PRINT, PRINT, "=========== Form Token ==========\n");
2071 
2072         if ( is_form1 )
2073             snprintf(&form_name[0], 10, "FORM 1");
2074         else
2075             snprintf(&form_name[0], 10, "FORM 2");
2076 
2077         Alarmp( SPLOG_PRINT, PRINT, "%s Token, sent by %u.%u.%u.%u. Seq: %d\n", form_name, IP1(form_token->transmiter_id), IP2(form_token->transmiter_id), IP3(form_token->transmiter_id), IP4(form_token->transmiter_id), form_token->seq);
2078         Alarmp( SPLOG_PRINT, PRINT, "ProcID: %u.%u.%u.%u\t ARU: %d, ARU LastID: %u.%u.%u.%u\n", IP1(form_token->proc_id), IP2(form_token->proc_id), IP3(form_token->proc_id), IP4(form_token->proc_id), form_token->aru, IP1(form_token->aru_last_id), IP2(form_token->aru_last_id), IP3(form_token->aru_last_id), IP4(form_token->aru_last_id) );
2079         Alarmp( SPLOG_PRINT, PRINT, "FlowControl: %hd\tRTR Len: %hd\n", form_token->flow_control, form_token->rtr_len);
2080         /* Print members list */
2081 
2082         Alarmp( SPLOG_PRINT, PRINT, "Form Token members list -- Active (%hd) Pending (%hd)\n", m_info->num_members, m_info->num_pending);
2083         for (i=0; i < m_info->num_members; i++)
2084         {
2085             Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
2086             if ( (i % 3) == 2 )
2087                 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2088         }
2089 
2090         Alarmp( SPLOG_PRINT_NODATE, PRINT, "\nPending Members:\n");
2091 
2092         for (i= m_info->num_members; i < ( m_info->num_members + m_info->num_pending); i++)
2093         {
2094             Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
2095             if ( (i % 3) == 2 )
2096                 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2097         }
2098         Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2099 
2100         if ( is_form1 )
2101         {
2102             /* Print reps list */
2103             Alarmp( SPLOG_PRINT, PRINT, "Form Token reps list -- Count (%hd) index (%hd)\n", r_info->num_reps, r_info->rep_index);
2104             for (i=0; i < r_info->num_reps; i++)
2105             {
2106                 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u (T %hd SegInd %hd) ", i, IP1(r_info->reps[i].proc_id), IP2(r_info->reps[i].proc_id), IP3(r_info->reps[i].proc_id), IP4(r_info->reps[i].proc_id), r_info->reps[i].type, r_info->reps[i].seg_index );
2107                 if ( (i % 3) == 2 )
2108                     Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2109             }
2110             Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2111         } else  /* so is FORM2 type */
2112         {
2113             Alarmp( SPLOG_PRINT, PRINT, "Form Token Membership ID %u.%u.%u.%u : %d\n", IP1(m_id_info->proc_id), IP2(m_id_info->proc_id), IP3(m_id_info->proc_id), IP4(m_id_info->proc_id), m_id_info->time );
2114         }
2115 
2116         /* Print ring info */
2117         Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list -- Count (%d)\n", *num_rings);
2118         for (i=0; i < *num_rings; i++)
2119         {
2120             bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
2121             c_ptr = (char *) rg_info;
2122 
2123             Alarmp( SPLOG_PRINT, PRINT, "Ring %u: MembID %u.%u.%u.%u - %u\tTransTime %u\n", i, IP1(rg_info->memb_id.proc_id), IP2(rg_info->memb_id.proc_id), IP3(rg_info->memb_id.proc_id), IP4(rg_info->memb_id.proc_id), rg_info->memb_id.time, rg_info->trans_time);
2124             Alarmp( SPLOG_PRINT, PRINT, "\tARU: %d\tHighSeq: %d\tNumHoles: %d\n", rg_info->aru, rg_info->highest_seq, rg_info->num_holes);
2125             Alarmp( SPLOG_PRINT, PRINT, "\tNumCommit: %hd\tNumTrans: %hd\n", rg_info->num_commit, rg_info->num_trans);
2126             /* Now print all missing messages from this ring (holes) */
2127             commit_id = (int32 *) &c_ptr[sizeof(ring_info)];
2128 
2129             Alarmp( SPLOG_PRINT, PRINT, "\tMessage Holes:");
2130             for (j=0; j < rg_info->num_holes; j++)
2131             {
2132                 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u ", *commit_id);
2133                 commit_id++;
2134             }
2135             Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2136 
2137             /* Now print transitional member list */
2138             Alarmp( SPLOG_PRINT, PRINT, "\tTrans List:");
2139             for (j=0; j < rg_info->num_trans; j++)
2140             {
2141                 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
2142                 if ( (j % 3) == 2 )
2143                     Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2144                 commit_id++;
2145             }
2146             Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2147 
2148             /* Now print commit list. This follows the trans list with no gaps. */
2149             Alarmp( SPLOG_PRINT, PRINT, "\tCommit List:");
2150             for (j=rg_info->num_trans; j < rg_info->num_commit; j++)
2151             {
2152                 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
2153                 if ( (j % 3) == 2 )
2154                     Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2155                 commit_id++;
2156             }
2157             Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2158 
2159             /* next ring */
2160 	    rg_info = (ring_info *)&c_ptr[bytes_to_skip];
2161         }
2162 
2163         Alarmp( SPLOG_PRINT, PRINT, "====================================================\n");
2164 }
2165 
Backoff_membership()2166 static	void	Backoff_membership()
2167 {
2168 	int	pack_entry;
2169 	int	i;
2170 
2171 	pack_entry=-1;
2172 	for( i=Last_discarded+1; i <= Highest_seq; i++ )
2173 	{
2174 		/* clear dummy messages */
2175 		pack_entry = i & PACKET_MASK;
2176 		if( Packets[pack_entry].exist == 3 )
2177 			Packets[pack_entry].exist = 0;
2178 	}
2179 
2180 	/* return Aru and My_aru */
2181 	Aru = Last_discarded;
2182 
2183 	My_aru = Last_discarded;
2184 	for( i=Last_discarded+1; i <= Highest_seq; i++ )
2185 	{
2186 		if( !Packets[pack_entry].exist ) break;
2187 		My_aru++;
2188 	}
2189 }
2190 
Memb_commit()2191 void	Memb_commit()
2192 {
2193 	Commit_set = Future_commit_set;
2194 }
2195 
Memb_transitional()2196 void	Memb_transitional()
2197 {
2198 	int	i, j, k;
2199 	int32u	proc_id;
2200 
2201 	Alarm( MEMB, "Memb_transitional\n");
2202 
2203 	Transitional = 1;
2204 	Trans_membership.num_segments = Cn.num_segments;
2205 	for( i=0; i < Cn.num_segments; i++ )
2206 	{
2207                 Trans_membership.segments[i].num_procs = 0;
2208 		for( j=0; j < Cn.segments[i].num_procs; j++ )
2209 		{
2210 		    proc_id = Cn.segments[i].procs[j]->id;
2211 		    for( k=0; k < Commit_set.num_pending; k++ )
2212 		    {
2213 			if( Commit_set.members[k] == proc_id )
2214 			{
2215                                 if ( Conf_append_id_to_seg( &Trans_membership.segments[i], proc_id) == -1)
2216                                         Alarm(EXIT, "Memb_transitional: Commit_set has member %u for trans who doesn't exist\n", proc_id);
2217 				break;
2218 			}
2219 		    }
2220 		}
2221 	}
2222 
2223         Trans_memb_id.proc_id = Conf_leader( &Trans_membership );
2224         Trans_memb_id.time    = F_trans_memb_time;
2225 
2226 	Commit_membership.num_segments = Cn.num_segments;
2227 	for( i=0; i < Cn.num_segments; i++ )
2228 	{
2229 		Commit_membership.segments[i].num_procs = 0;
2230 		for( j=0; j < Cn.segments[i].num_procs; j++ )
2231 		{
2232 		    proc_id = Cn.segments[i].procs[j]->id;
2233 		    for( k=0; k < Commit_set.num_members; k++ )
2234 		    {
2235 			if( Commit_set.members[k] == proc_id )
2236 			{
2237                                 if ( Conf_append_id_to_seg(&Commit_membership.segments[i], proc_id) == -1)
2238                                         Alarm(EXIT, "Memb_transitional: Commit_set has member %u who doesn't exist\n", proc_id);
2239 				break;
2240 			}
2241 		    }
2242 		}
2243 	}
2244 }
2245 
Memb_regular()2246 void	Memb_regular()
2247 {
2248 	int	i;
2249 
2250 	Alarm( MEMB, "Memb_regular\n");
2251 	Transitional = 0;
2252 	State = OP;
2253 	GlobalStatus.state = OP;
2254 	GlobalStatus.membership_changes++;
2255 	Membership = Future_membership;
2256 	Membership_id = Future_membership_id;
2257 	Reg_membership = Membership;
2258 
2259 	GlobalStatus.num_procs = 0;
2260 	GlobalStatus.num_segments = 0;
2261 	for( i=0; i < Membership.num_segments; i++ )
2262 	{
2263 		if( Membership.segments[i].num_procs > 0 )
2264 		{
2265 			GlobalStatus.num_procs += Membership.segments[i].num_procs;
2266 			GlobalStatus.num_segments++;
2267 		}
2268 	}
2269 	GlobalStatus.leader_id = Membership_id.proc_id;
2270 
2271 	Foreign_found = 0;
2272 	if( Conf_leader( &Membership ) == My.id )
2273 		E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
2274 	printf("Membership id is ( %d, %d)\n", Membership_id.proc_id, Membership_id.time );
2275 	printf("%c", Conf_print( &Membership ) );
2276 }
2277 
Flip_members(members_info * members_ptr)2278 void	Flip_members( members_info *members_ptr )
2279 {
2280 /*
2281  * This routine can not be called twice beacuse of num_members and num_pending
2282  */
2283 	int	i;
2284 
2285 	members_ptr->num_members = Flip_int16( members_ptr->num_members );
2286 	members_ptr->num_pending = Flip_int16( members_ptr->num_pending );
2287 	for( i=0; i < members_ptr->num_members + members_ptr->num_pending; i++ )
2288 		members_ptr->members[i] = Flip_int32( members_ptr->members[i] );
2289 }
2290 
Flip_reps(reps_info * reps_ptr)2291 void	Flip_reps( reps_info *reps_ptr )
2292 {
2293 /*
2294  * This routine can not be called twice beacuse of num_reps
2295  */
2296 	int	i;
2297 
2298 	reps_ptr->num_reps  = Flip_int16( reps_ptr->num_reps );
2299 	reps_ptr->rep_index = Flip_int16( reps_ptr->rep_index );
2300 	for( i=0; i < reps_ptr->num_reps; i++ )
2301 	{
2302 	    reps_ptr->reps[i].proc_id	= Flip_int32( reps_ptr->reps[i].proc_id );
2303 	    reps_ptr->reps[i].type	= Flip_int16( reps_ptr->reps[i].type );
2304 	    reps_ptr->reps[i].seg_index = Flip_int16( reps_ptr->reps[i].seg_index );
2305 	}
2306 }
2307 
Flip_rings(char * buf)2308 void	Flip_rings( char *buf )
2309 {
2310 /*
2311  * This routine can not be called twice beacuse of *num_rings
2312  * and of ring_info_ptr->num_holes
2313  */
2314 	ring_info	*ring_info_ptr;
2315 	int32		*num_rings;
2316 	int		ptr;
2317 	char		*c_ptr;
2318 	int32		*seq_or_proc;
2319 	int		i,j;
2320 
2321 	c_ptr = buf;
2322 	ptr = 0;
2323 	num_rings = (int32 *)&c_ptr[ptr];
2324 
2325 	*num_rings = Flip_int32( *num_rings );
2326 	ptr += sizeof(int32);
2327 
2328 	for( i=0; i < *num_rings; i++ )
2329 	{
2330 	    ring_info_ptr = (ring_info *)&c_ptr[ptr];
2331 
2332 	    ring_info_ptr->memb_id.proc_id = Flip_int32( ring_info_ptr->memb_id.proc_id );
2333 	    ring_info_ptr->memb_id.time    = Flip_int32( ring_info_ptr->memb_id.time );
2334             ring_info_ptr->trans_time      = Flip_int32( ring_info_ptr->trans_time );
2335 	    ring_info_ptr->aru		   = Flip_int32( ring_info_ptr->aru );
2336 	    ring_info_ptr->highest_seq	   = Flip_int32( ring_info_ptr->highest_seq );
2337 	    ring_info_ptr->num_holes	   = Flip_int32( ring_info_ptr->num_holes );
2338 	    ring_info_ptr->num_commit	   = Flip_int16( ring_info_ptr->num_commit );
2339 	    ring_info_ptr->num_trans	   = Flip_int16( ring_info_ptr->num_trans );
2340 
2341 	    ptr += sizeof(ring_info);
2342 
2343 	    for( j=0; j < ( ring_info_ptr->num_holes + ring_info_ptr->num_commit ); j++ )
2344 	    {
2345 		seq_or_proc = (int32 *)&c_ptr[ptr];
2346 		*seq_or_proc = Flip_int32( *seq_or_proc );
2347 		ptr += sizeof(int32);
2348 	    }
2349 	}
2350 }
2351