1 /*
2 * The Spread Toolkit.
3 *
4 * The contents of this file are subject to the Spread Open-Source
5 * License, Version 1.0 (the ``License''); you may not use
6 * this file except in compliance with the License. You may obtain a
7 * copy of the License at:
8 *
9 * http://www.spread.org/license/
10 *
11 * or in the file ``license.txt'' found in this distribution.
12 *
13 * Software distributed under the License is distributed on an AS IS basis,
14 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15 * for the specific language governing rights and limitations under the
16 * License.
17 *
18 * The Creators of Spread are:
19 * Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20 *
21 * Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22 *
23 * All Rights Reserved.
24 *
25 * Major Contributor(s):
26 * ---------------
27 * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28 * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
29 * Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
30 * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
31 *
32 */
33
34
35 #include <string.h>
36 #include <stdio.h>
37 #include <assert.h>
38 #include "membership.h"
39 #include "spread_params.h"
40 #include "flow_control.h"
41 #include "prot_body.h"
42 #include "net_types.h"
43 #include "network.h"
44 #include "objects.h"
45 #include "memory.h"
46 #include "status.h"
47 #include "alarm.h"
48
49 #define POTENTIAL_REP 0
50 #define SEG_REP 1
51 #define RING_REP 2
52
53 typedef struct dummy_members_info{
54 int16 num_members;
55 int16 num_pending;
56 int32 members[MAX_PROCS_RING];
57 } members_info;
58
59 typedef struct dummy_rep_info{
60 int32 proc_id;
61 int16 type;
62 int16 seg_index;
63 } rep_info;
64
65 typedef struct dummy_reps_info{
66 int16 num_reps;
67 int16 rep_index;
68 rep_info reps[MAX_REPS];
69 } reps_info;
70
71 typedef struct dummy_ring_info{
72 membership_id memb_id;
73 int32 trans_time;
74 int32 aru;
75 int32 highest_seq;
76 int32 num_holes;
77 int16 num_commit;
78 int16 num_trans;
79 } ring_info;
80
81 static configuration Membership;
82 static membership_id Membership_id;
83 static configuration Future_membership;
84 static membership_id Future_membership_id;
85
86 static membership_id Trans_memb_id;
87 static int32 F_trans_memb_time;
88 static int32 Last_time_used;
89
90 static int State;
91 static int Token_alive;
92 static proc My;
93 static segment My_conf_seg;
94 static int32 My_seg_rep;
95 static int Foreign_found;
96 static configuration Cn;
97
98 static members_info F_members;
99 static reps_info F_reps;
100 static reps_info Potential_reps;
101
102 static members_info Commit_set;
103 static members_info Future_commit_set;
104
105 static sys_scatter Send_pack;
106
107 static sp_time Zero_timeout = { 0, 0};
108
109 static void Memb_handle_alive ( sys_scatter *scat );
110 static void Memb_handle_join ( sys_scatter *scat );
111 static void Memb_handle_refer ( sys_scatter *scat );
112 static void Memb_handle_foreign( sys_scatter *scat );
113 static void Memb_handle_form1 ( sys_scatter *scat );
114 static void Memb_handle_form2 ( sys_scatter *scat );
115 static void Shift_to_seg();
116 static void Gather_or_represented();
117 static void Shift_to_gather();
118 static void Shift_to_represented();
119 static void Form_or_fail();
120 static void Scast_alive();
121 static void Send_join();
122 static void Lookup_new_members();
123 static int Insert_member( members_info *m, int32 proc_id );
124 static int Insert_rep( reps_info *r, rep_info rep );
125 static int32 Smallest_member( members_info *m, int *index );
126 static int32 Smallest_rep( reps_info *r, int *index );
127 static void Sort_members( members_info *m );
128 static void Sort_reps( reps_info *r );
129 static void Create_form1();
130 static void Fill_form1( sys_scatter *scat );
131 static void Read_form2( sys_scatter *scat );
132 static void Backoff_membership();
133 static void Flip_members( members_info *members_ptr );
134 static void Flip_reps( reps_info *reps_ptr );
135 static void Flip_rings( char *buf );
136
Memb_init()137 void Memb_init()
138 {
139 packet_header *pack_ptr;
140 int32 reference_subnet;
141 int32 current_subnet;
142 int i;
143
144 State = OP;
145 GlobalStatus.state = OP;
146 GlobalStatus.membership_changes = 0;
147
148 My = Conf_my();
149 Cn = Conf();
150
151 reference_subnet = Cn.segments[0].procs[0]->id;
152 reference_subnet = reference_subnet & 0xffff0000;
153
154 Wide_network = 0;
155 for( i=1; i < Cn.num_segments; i++ )
156 {
157 current_subnet = Cn.segments[i].procs[0]->id;
158 current_subnet = current_subnet & 0xffff0000;
159 if( current_subnet != reference_subnet )
160 {
161 Wide_network = 1;
162 break;
163 }
164 }
165
166 if( Wide_network )
167 {
168 Token_timeout.sec = 20; Token_timeout.usec = 0;
169 Hurry_timeout.sec = 6; Hurry_timeout.usec = 0;
170
171 Alive_timeout.sec = 1; Alive_timeout.usec = 0;
172 Join_timeout.sec = 1; Join_timeout.usec = 0;
173 Rep_timeout.sec = 5; Rep_timeout.usec = 0;
174 Seg_timeout.sec = 2; Seg_timeout.usec = 0;
175 Gather_timeout.sec = 10; Gather_timeout.usec = 0;
176 Form_timeout.sec = 10; Form_timeout.usec = 0;
177 Lookup_timeout.sec = 90; Lookup_timeout.usec = 0;
178 }else{
179 Token_timeout.sec = 5; Token_timeout.usec = 0;
180 Hurry_timeout.sec = 2; Hurry_timeout.usec = 0;
181
182 Alive_timeout.sec = 1; Alive_timeout.usec = 0;
183 Join_timeout.sec = 1; Join_timeout.usec = 0;
184 Rep_timeout.sec = 2; Rep_timeout.usec = 500000;
185 Seg_timeout.sec = 2; Seg_timeout.usec = 0;
186 Gather_timeout.sec = 5; Gather_timeout.usec = 0;
187 Form_timeout.sec = 5; Form_timeout.usec = 0;
188 Lookup_timeout.sec = 60; Lookup_timeout.usec = 0;
189 }
190
191 /* Lookup timeout when only one segment exists can be longer,
192 * since a no remote segments need to be probed
193 */
194 if ( Cn.num_segments == 1 )
195 Lookup_timeout.sec = 300;
196
197 Membership = Conf();
198 My_conf_seg = Cn.segments[My.seg_index];
199 for( i=0; i < Conf().num_segments; i++ )
200 Membership.segments[i].num_procs = 0;
201 Conf_append_id_to_seg( &Membership.segments[My.seg_index], My.id);
202 Membership_id.proc_id = My.id;
203 Membership_id.time = -1;
204 Last_time_used = Membership_id.time;
205 Transitional = 0;
206 Reg_membership = Membership;
207
208 pack_ptr = new(PACK_HEAD_OBJ);
209 pack_ptr->proc_id = My.id;
210
211 Send_pack.elements[0].len = sizeof(packet_header);
212 Send_pack.elements[0].buf = (char *)pack_ptr;
213
214 Memb_token_loss();
215 }
216
Memb_active_ptr()217 configuration *Memb_active_ptr()
218 {
219 if( State == EVS ) return ( &Future_membership );
220 else return( &Membership );
221 }
222
Memb_id()223 membership_id Memb_id()
224 {
225 return( Membership_id );
226 }
227
Memb_trans_id()228 membership_id Memb_trans_id()
229 {
230 return( Trans_memb_id );
231 }
232
Memb_is_equal(membership_id m1,membership_id m2)233 int Memb_is_equal( membership_id m1, membership_id m2 )
234 {
235 if( m1.proc_id == m2.proc_id && m1.time == m2.time )
236 return( 1 );
237 else
238 return( 0 );
239 }
240
Memb_state()241 int32 Memb_state()
242 {
243 return( State );
244 }
245
Memb_token_alive()246 int Memb_token_alive()
247 {
248 return( Token_alive );
249 }
250
Memb_handle_message(sys_scatter * scat)251 void Memb_handle_message( sys_scatter *scat )
252 {
253 packet_header *pack_ptr;
254
255 pack_ptr = (packet_header *)scat->elements[0].buf;
256 if( Is_alive( pack_ptr->type ) )
257 {
258 Alarm( MEMB, "Memb_handle_message: handling alive message\n");
259 Memb_handle_alive( scat );
260
261 }else if( Is_join( pack_ptr->type ) ){
262 Alarm( MEMB, "Memb_handle_message: handling join message from %d, State is %d\n", pack_ptr->proc_id, State);
263 Memb_handle_join( scat );
264
265 }else if( Is_refer( pack_ptr->type ) ){
266 Alarm( MEMB, "Memb_handle_message: handling refer message from %d, State is %d\n",pack_ptr->proc_id, State);
267 Memb_handle_refer( scat );
268
269 }else if( Is_regular( pack_ptr->type ) ){
270 Alarm( NONE, "Memb_handle_message: handling foreign message from %d, State is %d\n",pack_ptr->proc_id, State);
271 Memb_handle_foreign( scat );
272
273 }else{
274 Alarm( EXIT, "Memb_handle_message: unknown message type %d\n",
275 pack_ptr->type );
276
277 }
278 }
279
Memb_handle_token(sys_scatter * scat)280 void Memb_handle_token( sys_scatter *scat )
281 {
282 token_header *token_ptr;
283
284 token_ptr = (token_header *)scat->elements[0].buf;
285 if( Is_form1( token_ptr->type ) )
286 {
287 Alarm( MEMB, "Memb_handle_token: handling form1 token\n");
288 Memb_handle_form1( scat );
289
290 }else if( Is_form2( token_ptr->type ) ){
291 Alarm( MEMB, "Memb_handle_token: handling form2 token\n");
292 Memb_handle_form2( scat );
293
294 }else{
295 Alarm( EXIT, "Memb_handle_token: unknown token type %d\n",
296 token_ptr->type );
297
298 }
299 }
300
Memb_handle_alive(sys_scatter * scat)301 static void Memb_handle_alive( sys_scatter *scat )
302 {
303 packet_header *pack_ptr;
304
305 pack_ptr = (packet_header *)scat->elements[0].buf;
306
307 switch( State )
308 {
309 case OP:
310 Alarm( MEMB, "Handle_alive in OP\n");
311 if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) != -1 )
312 {
313 /* sender belongs to my ring - my token is lost */
314 Memb_token_loss();
315 /* update my F_members list */
316 Insert_member( &F_members, pack_ptr->proc_id );
317 }
318 break;
319
320 case SEG:
321 Alarm( MEMB, "Handle_alive in SEG\n");
322 /* update my F_members list */
323 Insert_member( &F_members, pack_ptr->proc_id );
324
325 break;
326
327 case REPRESENTED:
328 Alarm( MEMB, "Handle_alive in REPRESENTED\n");
329 E_queue( Shift_to_seg, 0, NULL, Rep_timeout );
330
331 break;
332
333 case GATHER:
334 Alarm( MEMB, "Handle_alive in GATHER\n");
335 /*
336 * If I am a seg representative - take this guy
337 * If I am a ring leader: if this guy in my ring - lost token
338 * otherwise ignore.
339 */
340 if( ! Token_alive ) Insert_member( &F_members, pack_ptr->proc_id );
341 else if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) != -1 ){
342 /* sender belongs to my ring - my token is lost */
343 Memb_token_loss();
344 /* update my F_members list */
345 Insert_member( &F_members, pack_ptr->proc_id );
346 }
347 break;
348
349 case FORM:
350 /*
351 * Ignore.
352 * In principle if this guy is in my formed membership
353 * then I know I lost the token, but there is no way
354 * to know at this time if this guy is in my formed memb.
355 */
356
357 break;
358
359 case EVS:
360 /*
361 * If this guy in my formed membership then my token is lost
362 * Otherwise ignore
363 */
364 if( Conf_id_in_conf( &Future_membership, pack_ptr->proc_id ) != -1 )
365 {
366 Memb_token_loss();
367 /* update my F_members list */
368 Insert_member( &F_members, pack_ptr->proc_id );
369 }
370 break;
371 }
372 }
373
Memb_handle_join(sys_scatter * scat)374 static void Memb_handle_join( sys_scatter *scat )
375 {
376 packet_header *pack_ptr;
377 members_info *members_ptr;
378 reps_info *reps_ptr;
379 packet_header refer_pack;
380 sys_scatter send_scat;
381 proc p;
382 int i;
383 int ret;
384 int dummy;
385
386 pack_ptr = (packet_header *) scat->elements[0].buf;
387 members_ptr = (members_info *) scat->elements[1].buf;
388
389 if( !Same_endian( pack_ptr->type ) ) Flip_members( members_ptr );
390
391 i = 2*sizeof(int16) + (members_ptr->num_members)*sizeof(int32);
392 reps_ptr = (reps_info *)&scat->elements[1].buf[i];
393
394 if( !Same_endian( pack_ptr->type ) ) Flip_reps( reps_ptr );
395
396 switch( State )
397 {
398 case OP:
399 /* if sender belongs to my ring then my token is lost (except for single segment confs)
400 otherwise if I am a ring leader shift_to_gather */
401 Alarm( MEMB, "Handle_join in OP\n");
402 if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) == -1 )
403 {
404 if( Conf_leader( &Membership ) == My.id )
405 {
406 Potential_reps.reps[0] = reps_ptr->reps[0];
407 Potential_reps.num_reps = 1;
408 F_members.num_members = 0;
409 F_members.num_pending = 0;
410 Shift_to_gather();
411 }else{
412 /* if !from my_seg OR seg_leader - answer ref */
413 if( reps_ptr->reps[0].seg_index != My.seg_index ||
414 Conf_seg_leader( &Membership, My.seg_index) == My.id )
415 {
416 refer_pack.type = REFER_TYPE;
417 refer_pack.proc_id = My.id;
418 refer_pack.memb_id.proc_id = Conf_leader( &Membership );
419 refer_pack.memb_id.time = RING_REP;
420 refer_pack.data_len = 0;
421 send_scat.num_elements = 1;
422 send_scat.elements[0].buf = (char *)&refer_pack;
423 send_scat.elements[0].len = sizeof(packet_header);
424 Net_ucast( pack_ptr->proc_id, &send_scat );
425 }
426 }
427 }else{
428 /* sender belongs to my ring - my token is lost */
429 /* If only one segment exists, then token is not lost, but rather
430 * a JOIN probe for merged segments was received. This can be
431 * ignored because a real token loss will trigger an ALIVE broadcast
432 * packet which will force the membership change.
433 */
434 if ( Cn.num_segments > 1 )
435 Memb_token_loss();
436 }
437 break;
438
439 case SEG:
440 ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
441 if( My.seg_index == p.seg_index &&
442 reps_ptr->reps[0].type == SEG_REP)
443 {
444 /* this guy is my representative */
445 My_seg_rep = pack_ptr->proc_id;
446 Shift_to_represented();
447 for( i=0; i < members_ptr->num_members; i++ )
448 if( members_ptr->members[i] == My.id ) break;
449 Scast_alive( 1 );
450 }else{
451 /* no need to remember this join */
452 }
453 break;
454
455 case REPRESENTED:
456 ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
457 if( My.seg_index == p.seg_index &&
458 reps_ptr->reps[0].type == SEG_REP)
459 {
460 if( pack_ptr->proc_id != My_seg_rep )
461 {
462 Alarm( MEMB,
463 "Memb_handle_join: representative shifts: old %d, new %d\n",
464 My_seg_rep, pack_ptr->proc_id );
465 My_seg_rep = pack_ptr->proc_id;
466 }
467 E_queue( Shift_to_seg, 0, NULL, Rep_timeout );
468 for( i=0; i < members_ptr->num_members; i++ )
469 if( members_ptr->members[i] == My.id ) break;
470 Scast_alive( 1 );
471 }else{
472 /* if My_seg_rep is determined - send it to this guy */
473 /* My_seg_rep can be undetermined if it did not issue a join yet */
474 if( My_seg_rep != -1 )
475 {
476 /* send my_seg_rep to pack_ptr->proc_id */
477 refer_pack.type = REFER_TYPE;
478 refer_pack.proc_id = My.id;
479 refer_pack.memb_id.proc_id = My_seg_rep;
480 refer_pack.memb_id.time = SEG_REP;
481 refer_pack.data_len = 0;
482 send_scat.num_elements = 1;
483 send_scat.elements[0].buf = (char *)&refer_pack;
484 send_scat.elements[0].len = sizeof(packet_header);
485 Net_ucast( pack_ptr->proc_id, &send_scat );
486 }
487 }
488 break;
489
490 case GATHER:
491 /* update my F_reps list */
492 Insert_rep( &F_reps, reps_ptr->reps[0] );
493 for( i=0; i < reps_ptr->num_reps; i++ )
494 {
495 Insert_rep( &Potential_reps, reps_ptr->reps[i] );
496 }
497 /* if sender is my Smallest rep - advance Gather_timeout */
498 if( Smallest_rep( &F_reps, &dummy ) == reps_ptr->reps[0].proc_id )
499 E_queue( Form_or_fail, 0, NULL, Gather_timeout );
500
501 break;
502
503 case FORM:
504 /* Ignore */
505
506 break;
507
508 case EVS:
509 /*
510 * If this guy in my formed membership then my token is lost
511 * Otherwise ignore
512 */
513 if( Conf_id_in_conf( &Future_membership, pack_ptr->proc_id ) != -1 )
514 {
515 Memb_token_loss();
516 /* update my F_members list */
517 Insert_member( &F_members, pack_ptr->proc_id );
518 }
519 break;
520 }
521 }
522
Memb_handle_refer(sys_scatter * scat)523 static void Memb_handle_refer( sys_scatter *scat )
524 {
525 packet_header *pack_ptr;
526 proc p;
527 rep_info temp_rep;
528 int ret;
529
530 pack_ptr = (packet_header *) scat->elements[0].buf;
531
532 switch( State )
533 {
534 case OP:
535 Alarm( MEMB, "Handle_refer in OP\n");
536 break;
537
538 case SEG:
539 Alarm( MEMB, "Handle_refer in SEG\n");
540 break;
541
542 case REPRESENTED:
543 Alarm( MEMB, "Handle_refer in REPRESENTED\n");
544 break;
545
546 case GATHER:
547 Alarm( MEMB, "Handle_refer in GATHER\n");
548 ret = Conf_proc_by_id( pack_ptr->memb_id.proc_id, &p );
549 if( ret < 0 )
550 {
551 Alarm( PRINT, "Memb_handle_refer: unknown proc_id %d\n",
552 pack_ptr->memb_id.proc_id );
553 return;
554 }
555 temp_rep.proc_id = p.id;
556 temp_rep.type = pack_ptr->memb_id.time;
557 temp_rep.seg_index = p.seg_index;
558 Insert_rep( &Potential_reps, temp_rep );
559 break;
560
561 case FORM:
562 Alarm( MEMB, "Handle_refer in FORM\n");
563 break;
564
565 case EVS:
566 Alarm( MEMB, "Handle_refer in EVS\n");
567 break;
568
569 }
570 }
571
Memb_handle_foreign(sys_scatter * scat)572 static void Memb_handle_foreign( sys_scatter *scat )
573 {
574 packet_header *pack_ptr;
575 proc p;
576 int ret;
577
578 pack_ptr = (packet_header *) scat->elements[0].buf;
579
580 switch( State )
581 {
582 case OP:
583 Alarm( NONE, "Handle_foreign in OP\n");
584 ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
585 if( ret < 0 )
586 {
587 Alarm( PRINT, "Memb_handle_foreign: unknown proc_id %d\n",
588 pack_ptr->proc_id );
589 return;
590 }
591 if( Conf_leader( &Membership ) == My.id )
592 {
593 Lookup_new_members();
594 }else if( Conf_seg_leader( &Membership, My.seg_index ) == My.id &&
595 (!Foreign_found) ){
596 /*
597 * Seg leader : sending one foreign message
598 * to Conf leader
599 */
600 sys_scatter send_scat;
601
602 Foreign_found = 1;
603 send_scat.num_elements = 2;
604 send_scat.elements[0].buf = scat->elements[0].buf;
605 send_scat.elements[0].len = scat->elements[0].len;
606 send_scat.elements[1].buf = scat->elements[1].buf;
607 send_scat.elements[1].len = pack_ptr->data_len;
608
609 Net_ucast( Conf_leader( &Membership ), &send_scat );
610 Net_ucast( Conf_leader( &Membership ), &send_scat );
611 }
612 break;
613
614 case SEG:
615 Alarm( NONE, "Handle_foreign in SEG\n");
616 break;
617
618 case REPRESENTED:
619 Alarm( NONE, "Handle_foreign in REPRESENTED\n");
620 break;
621
622 case GATHER:
623 Alarm( NONE, "Handle_foreign in GATHER\n");
624 break;
625
626 case FORM:
627 Alarm( NONE, "Handle_foreign in FORM\n");
628 break;
629
630 case EVS:
631 Alarm( NONE, "Handle_foreign in EVS\n");
632 /*
633 * This may happen when multiple old memberships merge
634 * and a message is retransmitted using bcast or scast.
635 * i.e. more than one proc miss it and another proc recieves it
636 */
637 break;
638
639 }
640 }
641
Memb_handle_form1(sys_scatter * scat)642 static void Memb_handle_form1( sys_scatter *scat )
643 {
644 switch( State )
645 {
646 case OP:
647 Alarm( MEMB, "Handle_form1 in OP\n");
648 if( Conf_leader( &Membership ) == My.id ) /* do nothing */;
649 else Fill_form1( scat );
650 break;
651
652 case SEG:
653 Alarm( MEMB, "Handle_form1 in SEG\n");
654 /* swallow this token */
655 break;
656
657 case REPRESENTED:
658 Alarm( MEMB, "Handle_form1 in REPRESENTED\n");
659 Fill_form1( scat );
660 break;
661
662 case GATHER:
663 Alarm( MEMB, "Handle_form1 in GATHER\n");
664 Fill_form1( scat );
665 break;
666
667 case FORM:
668 Alarm( MEMB, "Handle_form1 in FORM\n");
669 /* swallow this token */
670 break;
671
672 case EVS:
673 Alarm( MEMB, "Handle_form1 in EVS\n");
674 /* swallow this token */
675 break;
676
677 }
678 }
679
Memb_handle_form2(sys_scatter * scat)680 static void Memb_handle_form2( sys_scatter *scat )
681 {
682 switch( State )
683 {
684 case OP:
685 Alarm( MEMB, "Handle_form2 in OP\n");
686 /* swallow this token */
687 break;
688
689 case SEG:
690 Alarm( MEMB, "Handle_form2 in SEG\n");
691 /* swallow this token */
692 break;
693
694 case REPRESENTED:
695 Alarm( MEMB, "Handle_form2 in REPRESENTED\n");
696 /* swallow this token */
697 break;
698
699 case GATHER:
700 Alarm( MEMB, "Handle_form2 in GATHER\n");
701 /* swallow this token */
702 break;
703
704 case FORM:
705 Alarm( MEMB, "Handle_form2 in FORM\n");
706 Read_form2( scat );
707 break;
708
709 case EVS:
710 Alarm( MEMB, "Handle_form2 in EVS\n");
711 /* swallow this token */
712 break;
713 }
714 }
715
Memb_token_loss()716 void Memb_token_loss()
717 {
718 rep_info temp_rep;
719 int i;
720
721 switch( State )
722 {
723 case OP:
724 /* my token is lost - shift to seg */
725
726 Commit_set.num_members = 1;
727 Commit_set.members[0] = My.id;
728
729 Potential_reps.num_reps = 0;
730 temp_rep.type = POTENTIAL_REP;
731 for( i=0; i < Conf().num_segments; i++ )
732 {
733 if( i == My.seg_index ) continue;
734 temp_rep.seg_index = i;
735 if( Membership.segments[i].num_procs > 0 )
736 {
737 temp_rep.proc_id = Membership.segments[i].procs[0]->id;
738 }else{
739 temp_rep.proc_id = Cn.segments[i].procs[0]->id;
740 }
741 Insert_rep( &Potential_reps, temp_rep );
742 }
743 break;
744
745 case SEG:
746 case REPRESENTED:
747 Alarm( EXIT, "Memb_token_loss: bug !!! state is %d\n",State);
748
749 break;
750
751 case GATHER:
752 /* I think I totally solved it */
753 /* If I am not a ring leader it is a bug */
754 if( ! Token_alive )
755 Alarm( EXIT, "Memb_token_loss: bug !!! state is %d\n",State);
756
757 /* I am a ring leader and I lost my ring */
758 Commit_set.num_members = 1;
759 Commit_set.members[0] = My.id;
760
761 Potential_reps.num_reps = 0;
762 temp_rep.type = POTENTIAL_REP;
763
764 /* First adding reps that I gathered - they are more important */
765 /* starting from 1 not to include myself!! */
766 for( i=1; i < F_reps.num_reps; i++ )
767 {
768 temp_rep = F_reps.reps[i];
769 Insert_rep( &Potential_reps, temp_rep );
770 }
771
772 /* Next adding reps from my membership - they are less important */
773 for( i=0; i < Conf().num_segments; i++ )
774 {
775 if( i == My.seg_index ) continue;
776 temp_rep.seg_index = i;
777 if( Membership.segments[i].num_procs > 0 )
778 {
779 temp_rep.proc_id = Membership.segments[i].procs[0]->id;
780 }else{
781 temp_rep.proc_id = Cn.segments[i].procs[0]->id;
782 }
783 Insert_rep( &Potential_reps, temp_rep );
784 }
785 break;
786
787 case FORM:
788 /* my token is lost - shift to seg */
789 /* potential already updated according to form token */
790 break;
791
792 case EVS:
793 /* my token is lost - shift to seg */
794 /* clear empty messages */
795 Backoff_membership();
796
797 /* update potential according to future_membership */
798 Potential_reps.num_reps = 0;
799 temp_rep.type = POTENTIAL_REP;
800 for( i=0; i < Conf().num_segments; i++ )
801 {
802 if( i == My.seg_index ) continue;
803 temp_rep.seg_index = i;
804 if( Future_membership.segments[i].num_procs > 0 )
805 {
806 temp_rep.proc_id =
807 Future_membership.segments[i].procs[0]->id;
808 }else{
809 temp_rep.proc_id = Cn.segments[i].procs[0]->id;
810 }
811 Insert_rep( &Potential_reps, temp_rep );
812 }
813 break;
814 }
815 Alarm( MEMB, "Memb_token_loss: I lost my token, state is %d\n",State);
816 Shift_to_seg();
817
818 Token_alive = 0;
819 E_dequeue( Memb_token_loss, 0, NULL );
820 E_dequeue( Send_join, 0, NULL );
821 E_dequeue( Form_or_fail, 0, NULL );
822 E_dequeue( Prot_token_hurry, 0, NULL );
823 E_dequeue( Lookup_new_members, 0, NULL );
824 Last_token->type = 0;
825 Last_token->seq = 0;
826 Last_token->aru = 0;
827
828 for( i=0; i < Conf().num_segments; i++ )
829 Membership.segments[i].num_procs = 0;
830 Conf_append_id_to_seg(&Membership.segments[My.seg_index], My.id);
831 }
832
Shift_to_seg()833 static void Shift_to_seg()
834 {
835 State = SEG;
836 GlobalStatus.state = SEG;
837
838 F_members.num_members = 1;
839 F_members.members[0] = My.id;
840 F_members.num_pending = 0;
841 E_queue( Scast_alive, 0, NULL, Zero_timeout );
842 E_queue( Gather_or_represented, 0, NULL, Seg_timeout );
843 }
844
Gather_or_represented()845 static void Gather_or_represented()
846 {
847 int dummy;
848
849 My_seg_rep = -1;
850
851 if( Smallest_member( &F_members, &dummy ) == My.id )
852 {
853 Shift_to_gather();
854 }else{
855 Shift_to_represented();
856 }
857 }
858
Shift_to_gather()859 static void Shift_to_gather()
860 {
861 State = GATHER;
862 GlobalStatus.state = GATHER;
863
864 F_reps.num_reps = 1;
865 F_reps.reps[0].proc_id = My.id;
866 F_reps.reps[0].seg_index = My.seg_index;
867 if( Token_alive )
868 F_reps.reps[0].type = RING_REP;
869 else F_reps.reps[0].type = SEG_REP;
870
871 E_dequeue( Scast_alive, 0, NULL );
872 E_dequeue( Lookup_new_members, 0, NULL );
873 E_queue( Send_join, 0, NULL, Zero_timeout );
874 E_queue( Form_or_fail, 0, NULL, Gather_timeout );
875 }
876
Shift_to_represented()877 static void Shift_to_represented()
878 {
879 State = REPRESENTED;
880 GlobalStatus.state = REPRESENTED;
881
882 E_dequeue( Scast_alive, 0, NULL );
883 E_dequeue( Gather_or_represented, 0, NULL );
884 E_queue( Shift_to_seg, 0, NULL, Rep_timeout );
885 }
886
Form_or_fail()887 static void Form_or_fail()
888 {
889
890 rep_info temp_rep;
891 int i;
892 int dummy;
893
894 if( Smallest_rep( &F_reps, &dummy ) == My.id )
895 {
896 if( Token_alive && F_reps.num_reps == 1 )
897 {
898 /* clear everything and go back to op */
899 E_dequeue( Send_join, 0, NULL);
900 E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
901 State = OP;
902 GlobalStatus.state = OP;
903 }else{
904 /* create and send form token */
905 Create_form1();
906 }
907 }else{
908 if( Token_alive )
909 {
910 /* clear everything and go back to op */
911 Alarm( MEMB, "Form_or_fail:failed, return to OP\n");
912 E_dequeue( Send_join, 0, NULL );
913 E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
914 State = OP;
915 GlobalStatus.state = OP;
916 }else{
917 Alarm( MEMB, "Form_or_fail: failed to gather\n");
918 /* failed to gather again */
919 F_members.num_members = 1;
920 F_members.members[0] = My.id;
921 F_members.num_pending = 0;
922
923 Potential_reps.num_reps = 0;
924 temp_rep.type = POTENTIAL_REP;
925 /* starting from 1 not to include myself!! */
926 for( i=1; i < F_reps.num_reps; i++ )
927 {
928 temp_rep = F_reps.reps[i];
929 Insert_rep( &Potential_reps, temp_rep );
930 }
931
932 Shift_to_gather();
933 }
934 }
935 }
936
Scast_alive(int code,void * dummy)937 static void Scast_alive( int code, void *dummy )
938 {
939 packet_header *pack_ptr;
940
941 Alarm( MEMB, "Scast_alive: State is %d\n", State);
942
943 pack_ptr = (packet_header *)Send_pack.elements[0].buf;
944 pack_ptr->type = ALIVE_TYPE;
945 pack_ptr->data_len =
946 2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
947
948 Send_pack.elements[1].buf = (char *)&F_members;
949 Send_pack.elements[1].len = pack_ptr->data_len;
950 Send_pack.num_elements = 2;
951 Net_scast( My.seg_index, &Send_pack );
952 if( code == 0 )
953 E_queue( Scast_alive, 0, NULL, Alive_timeout );
954 }
955
Send_join()956 static void Send_join()
957 {
958 packet_header *pack_ptr;
959 int i;
960
961 Alarm( MEMB, "Send_join: State is %d\n", State);
962
963 pack_ptr = (packet_header *)Send_pack.elements[0].buf;
964 pack_ptr->type = JOIN_TYPE;
965 Send_pack.elements[1].buf = (char *)&F_members;
966 Send_pack.elements[1].len =
967 2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
968 Send_pack.elements[2].buf = (char *)&F_reps;
969 Send_pack.elements[2].len =
970 2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info);
971 Send_pack.num_elements = 3;
972
973 pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len;
974
975 if( !Token_alive )
976 {
977 Net_scast( My.seg_index, &Send_pack );
978 }
979 for( i=0; i < Potential_reps.num_reps; i++ )
980 {
981 Net_ucast( Potential_reps.reps[i].proc_id, &Send_pack );
982 }
983
984 E_queue( Send_join, 0, NULL, Join_timeout );
985 }
986
Lookup_new_members()987 static void Lookup_new_members()
988 {
989 packet_header *pack_ptr;
990 int num_missing;
991 int i,j;
992
993 if( State != OP )
994 {
995 Alarm( MEMB, "Lookup_new_member: not in OP state, returning\n");
996 return;
997 }
998
999 Potential_reps.num_reps = 0;
1000
1001 F_reps.num_reps = 1;
1002 F_reps.reps[0].proc_id = My.id;
1003 F_reps.reps[0].seg_index= My.seg_index;
1004 F_reps.reps[0].type = RING_REP;
1005 F_members.num_members = 0;
1006 F_members.num_pending = 0;
1007
1008 pack_ptr = (packet_header *)Send_pack.elements[0].buf;
1009 pack_ptr->type = JOIN_TYPE;
1010 Send_pack.elements[1].buf = (char *)&F_members;
1011 Send_pack.elements[1].len =
1012 2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
1013 Send_pack.elements[2].buf = (char *)&F_reps;
1014 Send_pack.elements[2].len =
1015 2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info);
1016 Send_pack.num_elements = 3;
1017
1018 pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len;
1019
1020 num_missing = 0;
1021 /* For single segment configured, send local broadcast of join to entire segment -- current members will ignore */
1022 if ( Cn.num_segments == 1 ) {
1023 Net_scast( My.seg_index, &Send_pack );
1024 num_missing++;
1025 } else {
1026 /* Send unicasts to each host that is not in the current membership. */
1027 for( i=0; i < Cn.num_segments; i++ )
1028 {
1029 for( j=0; j < Cn.segments[i].num_procs; j++ )
1030 {
1031 if( Conf_id_in_conf( &Reg_membership, Cn.segments[i].procs[j]->id ) == -1 )
1032 {
1033 Net_ucast( Cn.segments[i].procs[j]->id, &Send_pack );
1034 num_missing++;
1035 }
1036 }
1037 }
1038 }
1039 if( num_missing ) Shift_to_gather();
1040 }
1041
Insert_member(members_info * m,int32 proc_id)1042 static int Insert_member( members_info *m, int32 proc_id )
1043 {
1044 int i;
1045
1046 for( i=0; i < m->num_members; i++ )
1047 if( m->members[i] == proc_id )
1048 {
1049 return( 0 );
1050 }
1051
1052 if (m->num_members == MAX_PROCS_RING)
1053 {
1054 /* members structure is full -- so we ignore this new member */
1055 Alarmp( SPLOG_WARNING, MEMB, "Insert_member: members structure full (%u members) so ignore new member (ID %u.%u.%u.%u)\n", m->num_members, IP1(proc_id), IP2(proc_id), IP3(proc_id), IP4(proc_id) );
1056 return( 0 );
1057 }
1058 m->members[m->num_members] = proc_id;
1059 m->num_members++;
1060 return( 1 );
1061 }
1062
Insert_rep(reps_info * r,rep_info rep)1063 static int Insert_rep( reps_info *r, rep_info rep )
1064 {
1065 proc p;
1066 int ret;
1067 int i;
1068
1069 ret = Conf_proc_by_id( rep.proc_id, &p );
1070 if( ret < 0 )
1071 Alarm( EXIT, "Insert_rep: proc %d not exist\n", rep.proc_id );
1072
1073 if( rep.type == POTENTIAL_REP )
1074 {
1075 /* when Potential - if there is someone from same segment (of any type RING, SEG, POTENTIAL
1076 - don't insert */
1077 for( i=0; i < r->num_reps; i++ )
1078 {
1079 if( r->reps[i].seg_index == p.seg_index )
1080 {
1081 return( 0 );
1082 }
1083 }
1084 }else if( rep.type == SEG_REP ) {
1085 /*
1086 * when Seg - if there is other SEG or RING from same segment - keep all.
1087 * if it is POTENTIAL then replace it and exit because no other from this segment can exist.
1088 */
1089 for( i=0; i < r->num_reps; i++ )
1090 {
1091 if( r->reps[i].seg_index == p.seg_index )
1092 {
1093 if( r->reps[i].type == POTENTIAL_REP )
1094 {
1095 r->reps[i].type = SEG_REP;
1096 r->reps[i].proc_id = p.id;
1097 return( 1 );
1098 }else if( r->reps[i].type == SEG_REP ) {
1099 if (r->reps[i].proc_id == p.id )
1100 return( 0 );
1101 }else if( r->reps[i].type == RING_REP &&
1102 r->reps[i].proc_id == p.id ) {
1103 /* Former RING_REP lost its ring and became SEG_REP */
1104 r->reps[i].type = SEG_REP;
1105 return( 1 );
1106 }
1107 }
1108 }
1109 }else if( rep.type == RING_REP ) {
1110 /*
1111 * when Ring - if it exists exactly (proc_id) - make it from ring,
1112 * in any case elminate potentials.
1113 * If potential exists, it is known that no ring or seg exists.
1114 */
1115 for( i=0; i < r->num_reps; i++ )
1116 {
1117 if( r->reps[i].seg_index == p.seg_index )
1118 {
1119 if( r->reps[i].type == POTENTIAL_REP )
1120 {
1121 r->reps[i].type = RING_REP;
1122 r->reps[i].proc_id = p.id;
1123 return( 1 );
1124 }else if( r->reps[i].type == SEG_REP ) {
1125 if (r->reps[i].proc_id == p.id ) {
1126 r->reps[i].type = RING_REP;
1127 return( 1 );
1128 }
1129 }else if( r->reps[i].type == RING_REP &&
1130 r->reps[i].proc_id == p.id ) {
1131 return( 0 );
1132 }
1133 }
1134 }
1135 }else Alarm( EXIT, "Insert_rep: unknown rep.type %d \n", rep.type );
1136
1137 if (r->num_reps == MAX_REPS)
1138 {
1139 /* reps structure is full -- so we ignore this new rep */
1140 Alarmp( SPLOG_WARNING, MEMB, "Insert_rep: reps structure full (%u reps) so ignore new rep (Type %d SegIndex %u ID %u.%u.%u.%u)\n", r->num_reps, rep.type, rep.seg_index, IP1(rep.proc_id), IP2(rep.proc_id), IP3(rep.proc_id), IP4(rep.proc_id) );
1141 return( 0 );
1142 }
1143 r->reps[r->num_reps] = rep;
1144 r->num_reps++;
1145 return( 1 );
1146 }
1147
Smallest_member(members_info * m,int * index)1148 static int32 Smallest_member( members_info *m, int *index )
1149 {
1150 int current;
1151 int i;
1152 proc curr_proc, i_proc;
1153 int ret;
1154
1155 current = 0;
1156 ret = Conf_proc_by_id( m->members[current], &curr_proc );
1157 if( ret < 0 ) Alarm( EXIT, "Smallest_member: unknown proc_id %d\n",
1158 m->members[current] );
1159 for( i=1; i < m->num_members; i++ )
1160 {
1161 ret = Conf_proc_by_id( m->members[i], &i_proc );
1162 if( ret < 0 ) Alarm( EXIT,
1163 "Smallest_member: Bug! i: %d, proc %d\n",
1164 i, m->members[i] );
1165 if( i_proc.seg_index < curr_proc.seg_index ||
1166 ( i_proc.seg_index == curr_proc.seg_index &&
1167 i_proc.index_in_seg < curr_proc.index_in_seg ) )
1168 {
1169 curr_proc = i_proc;
1170 current = i;
1171 }
1172 }
1173 *index = current;
1174 return( m->members[current] );
1175 }
1176
Smallest_rep(reps_info * r,int * index)1177 static int32 Smallest_rep( reps_info *r, int *index )
1178 {
1179 int current;
1180 int i;
1181 proc curr_p, i_p;
1182 int ret;
1183
1184 current = 0;
1185 for( i=1; i < r->num_reps; i++ )
1186 {
1187 if( r->reps[current].type == SEG_REP )
1188 {
1189 if( r->reps[i].type == SEG_REP )
1190 {
1191 if( r->reps[i].seg_index < r->reps[current].seg_index )
1192 current = i;
1193 }
1194 }else if( r->reps[current].type == RING_REP ) {
1195 if( r->reps[i].type == RING_REP )
1196 {
1197 if( r->reps[i].seg_index < r->reps[current].seg_index )
1198 current = i;
1199 else if( r->reps[i].seg_index == r->reps[current].seg_index )
1200 {
1201 ret = Conf_proc_by_id( r->reps[current].proc_id, &curr_p );
1202 ret = Conf_proc_by_id( r->reps[i].proc_id, &i_p );
1203 if( i_p.index_in_seg < curr_p.index_in_seg )
1204 current = i;
1205 }
1206 }else if( r->reps[i].type == SEG_REP ){
1207 current = i;
1208 }
1209
1210 }else Alarm( EXIT,
1211 "Smallest_rep: bug! current index %d is proc %d of type %d\n",
1212 current,r->reps[current].proc_id,r->reps[current].type );
1213 }
1214 *index = current;
1215 return ( r->reps[current].proc_id );
1216 }
1217
Sort_members(members_info * m)1218 static void Sort_members( members_info *m )
1219 {
1220 members_info temp_members;
1221 int index;
1222 int i;
1223 int32 dummy;
1224
1225 temp_members = *m;
1226 for( i=0; i < m->num_members; i++ )
1227 {
1228 dummy = Smallest_member( &temp_members, &index );
1229 m->members[i] = temp_members.members[index];
1230 temp_members.num_members--;
1231 temp_members.members[index] =
1232 temp_members.members[temp_members.num_members];
1233 }
1234 }
1235
Sort_reps(reps_info * r)1236 static void Sort_reps( reps_info *r )
1237 {
1238 reps_info temp_reps;
1239 int index;
1240 int i;
1241 int32 dummy;
1242
1243 temp_reps = *r;
1244
1245 for( i=0; i < r->num_reps; i++ )
1246 {
1247 dummy = Smallest_rep( &temp_reps, &index );
1248 r->reps[i] = temp_reps.reps[index];
1249 temp_reps.num_reps--;
1250 temp_reps.reps[index] = temp_reps.reps[temp_reps.num_reps];
1251 }
1252 }
1253
Create_form1()1254 static void Create_form1()
1255 {
1256 token_header form_token;
1257 ring_info *rg_info;
1258 int32 *num_rings;
1259 int32 *holes_procs_ptr;
1260 int32 index;
1261 int pack_entry;
1262 int num_bytes;
1263 sys_scatter send_scat;
1264 char rg_info_buf[sizeof(token_body)];
1265 rep_info temp_rep;
1266 int i,j;
1267 int cur_num_members;
1268 members_info valid_members;
1269
1270 form_token.seq = Highest_seq+3333;
1271 form_token.proc_id = My.id;
1272 form_token.type = FORM1_TYPE;
1273
1274 /* if I am a ring leader - update my F_members */
1275 if( F_reps.reps[0].type == RING_REP )
1276 {
1277 F_members.num_members = 0;
1278 for( i=0; i < Membership.num_segments; i++ )
1279 for( j=0; j < Membership.segments[i].num_procs; j++ )
1280 {
1281 F_members.members[F_members.num_members] =
1282 Membership.segments[i].procs[j]->id;
1283 F_members.num_members++;
1284 }
1285 }
1286
1287 cur_num_members = 0;
1288 for ( i = 0; i < F_members.num_members; i++ ) {
1289 int invalid_member;
1290 invalid_member = 0;
1291
1292 /* Remove from F_members any members that are also in OUR F_reps (except myself). */
1293 for ( j = 0; j < F_reps.num_reps; j++ ) {
1294 if ( (F_members.members[i] == F_reps.reps[j].proc_id ) &&
1295 (F_members.members[i] != My.id) ) {
1296 invalid_member = 1;
1297 break;
1298 }
1299 }
1300 if (!invalid_member) {
1301 valid_members.members[cur_num_members] = F_members.members[i];
1302 cur_num_members++;
1303 }
1304 }
1305 memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) );
1306 F_members.num_members = cur_num_members;
1307
1308 /* I am the first in F_members. put the rest in pending */
1309 F_members.num_pending = F_members.num_members - 1;
1310 F_members.num_members = 1;
1311
1312 Sort_reps( &F_reps );
1313 F_reps.rep_index = 1;
1314 /* update potential in case of failure */
1315 Potential_reps.num_reps = 0;
1316 for( i=0; i < F_reps.num_reps; i++ )
1317 {
1318 temp_rep = F_reps.reps[i];
1319 if( temp_rep.seg_index != My.seg_index )
1320 {
1321 temp_rep.type = POTENTIAL_REP;
1322 Insert_rep( &Potential_reps, temp_rep );
1323 }
1324 }
1325
1326 num_bytes = 0;
1327 num_rings = (int32 *)rg_info_buf;
1328 *num_rings= 1;
1329 num_bytes += sizeof(int32);
1330 rg_info = (ring_info *)&rg_info_buf[num_bytes];
1331 num_bytes += sizeof(ring_info);
1332 holes_procs_ptr = (int32 *)&rg_info_buf[num_bytes];
1333
1334 rg_info->memb_id = Membership_id;
1335 rg_info->trans_time = 0;
1336 rg_info->aru = Aru;
1337 rg_info->highest_seq = Highest_seq;
1338
1339 /* update holes */
1340 rg_info->num_holes = 0;
1341 for( index = My_aru+1; index <= Highest_seq; index++ )
1342 {
1343 pack_entry = index & PACKET_MASK;
1344 if( ! Packets[pack_entry].exist )
1345 {
1346 *holes_procs_ptr = index;
1347 Alarm( MEMB ,
1348 "INSERT HOLE 1 IS %d My_aru is %d, Highest_seq is %d\n",
1349 index,My_aru, Highest_seq);
1350 holes_procs_ptr++;
1351 num_bytes += sizeof(int32);
1352 rg_info->num_holes++;
1353 }
1354 }
1355
1356 /* update commit-trans procs */
1357
1358 /* insert self in trans and commit */
1359 rg_info->num_commit = 1;
1360 rg_info->num_trans = 1;
1361 *holes_procs_ptr = My.id;
1362 num_bytes += sizeof(int32);
1363 holes_procs_ptr++;
1364
1365 /* insert other members of commit set */
1366 for( i=0; i < Commit_set.num_members; i++ )
1367 {
1368 /* skipping self, because already there */
1369 if( Commit_set.members[i] == My.id ) continue;
1370
1371 /* insert this member */
1372 *holes_procs_ptr = Commit_set.members[i];
1373 holes_procs_ptr++;
1374 num_bytes += sizeof(int32);
1375 rg_info->num_commit++;
1376 }
1377
1378 send_scat.num_elements = 4;
1379 send_scat.elements[0].buf = (char *)&form_token;
1380 send_scat.elements[0].len = sizeof(token_header);
1381 send_scat.elements[1].buf = (char *)&F_members;
1382 send_scat.elements[1].len = sizeof(members_info);
1383 send_scat.elements[2].buf = (char *)&F_reps;
1384 send_scat.elements[2].len = sizeof(reps_info);
1385 send_scat.elements[3].buf = rg_info_buf;
1386 send_scat.elements[3].len = num_bytes;
1387
1388 form_token.rtr_len = send_scat.elements[1].len + send_scat.elements[2].len +
1389 send_scat.elements[3].len;
1390
1391 /* compute whom to send to */
1392 if( F_members.num_pending > 0 )
1393 {
1394 /* send to next member in pending list */
1395 Net_ucast_token( F_members.members[F_members.num_members],
1396 &send_scat );
1397 Net_ucast_token( F_members.members[F_members.num_members],
1398 &send_scat );
1399 }else if( F_reps.rep_index < F_reps.num_reps){
1400 /* send to next rep */
1401 Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );
1402 Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );
1403 }else{
1404 /* singleton membership */
1405 F_members.num_pending = 1;
1406 F_members.num_members = 0;
1407 form_token.type = FORM2_TYPE;
1408 send_scat.elements[2].len = sizeof(membership_id);
1409 form_token.rtr_len = send_scat.elements[1].len +
1410 send_scat.elements[2].len + send_scat.elements[3].len;
1411 Net_ucast_token( My.id, &send_scat );
1412 }
1413
1414 E_dequeue( Send_join, 0, NULL );
1415 E_queue( Memb_token_loss, 0, NULL, Form_timeout );
1416 E_dequeue( Prot_token_hurry, 0, NULL );
1417 Token_alive = 0;
1418
1419 State = FORM;
1420 GlobalStatus.state = FORM;
1421 }
1422
Fill_form1(sys_scatter * scat)1423 static void Fill_form1( sys_scatter *scat )
1424 {
1425 sys_scatter send_scat;
1426 token_header *form_token;
1427 members_info *m_info;
1428 reps_info *r_info;
1429 ring_info *old_rg_info, *new_rg_info;
1430 ring_info *my_rg_info;
1431 int32 *old_num_rings, *new_num_rings;
1432 int32 *my_holes_procs_ptr, *new_holes_procs_ptr;
1433 int32 index;
1434 int pack_entry;
1435 char rg_info_buf[sizeof(token_body)];
1436 char *c_ptr;
1437 char *rings_buf;
1438 int num_bytes;
1439 int bytes_to_copy;
1440 rep_info temp_rep;
1441 int i,j,k;
1442 int cur_num_members;
1443 int num_to_copy;
1444 members_info valid_members;
1445
1446 num_bytes = 0;
1447
1448 form_token = (token_header *)scat->elements[0].buf;
1449
1450 m_info = (members_info *)scat->elements[1].buf;
1451 num_bytes += sizeof(members_info);
1452
1453 r_info = (reps_info *)&scat->elements[1].buf[num_bytes];
1454 num_bytes += sizeof(reps_info);
1455
1456 rings_buf = &scat->elements[1].buf[num_bytes];
1457 old_num_rings = (int32 *)&scat->elements[1].buf[num_bytes];
1458 num_bytes += sizeof(int32);
1459
1460 old_rg_info= (ring_info *)&scat->elements[1].buf[num_bytes];
1461
1462 if( !Same_endian( form_token->type ) )
1463 {
1464 Flip_members( m_info );
1465 Flip_reps( r_info );
1466 Flip_rings( rings_buf );
1467 }
1468
1469 /* update header */
1470 form_token->proc_id = My.id;
1471 if( form_token->seq < Highest_seq+3333 ) form_token->seq = Highest_seq+3333;
1472
1473 /* update members and reps */
1474 if( State == OP || State == REPRESENTED )
1475 {
1476 /* validity check */
1477 if( m_info->members[m_info->num_members] != My.id ||
1478 m_info->num_pending <= 0 ) return;
1479
1480 m_info->num_members++;
1481 m_info->num_pending--;
1482
1483 }else if( State == GATHER ){
1484
1485 /* validity check */
1486 if( r_info->reps[r_info->rep_index].proc_id != My.id ||
1487 m_info->num_pending != 0 )
1488 {
1489 return;
1490 }
1491
1492 /*
1493 * Add validation check to my F_members list. I need to remove members:
1494 * 1) Any members already in the m_info I just received in form1.
1495 * 2) Any members of r_info I just received in form1 (except myself).
1496 * 3) Any members that are also in OUR F_reps (except myself).
1497 * The reason for this is that these guys also became seg_reps
1498 * from the same segment as me, but did not make it to the final reps list on form1
1499 * and if included as members will act as seg_reps which they should not.
1500 */
1501 cur_num_members = 0;
1502 for ( i = 0; i < F_members.num_members; i++ ) {
1503 int invalid_member;
1504 invalid_member = 0;
1505
1506 /* 1) Any members already in the m_info I just received in form1. */
1507 for ( j = 0; j < m_info->num_members; j++ ) {
1508 if (F_members.members[i] == m_info->members[j] ) {
1509 invalid_member = 1;
1510 break;
1511 }
1512 }
1513
1514 /* 2) Any members of r_info I just received in form1 (except myself). */
1515 for ( j = 0; !invalid_member && j < r_info->num_reps; j++ ) {
1516 if ( (F_members.members[i] == r_info->reps[j].proc_id ) &&
1517 (F_members.members[i] != My.id) ) {
1518 invalid_member = 1;
1519 break;
1520 }
1521 }
1522
1523 /* 3) Any members that are also in OUR F_reps (except myself). */
1524 for ( j = 0; !invalid_member && j < F_reps.num_reps; j++ ) {
1525 if ( (F_members.members[i] == F_reps.reps[j].proc_id ) &&
1526 (F_members.members[i] != My.id) ) {
1527 invalid_member = 1;
1528 break;
1529 }
1530 }
1531 if (!invalid_member) {
1532 valid_members.members[cur_num_members] = F_members.members[i];
1533 cur_num_members++;
1534 }
1535 }
1536 memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) );
1537 F_members.num_members = cur_num_members;
1538
1539 /* Now add all my members into the form1 token */
1540 if( r_info->reps[r_info->rep_index].type == SEG_REP )
1541 {
1542 /* Fill myself and my members */
1543 i = m_info->num_members;
1544 for( j=0; j < F_members.num_members; j++, i++ )
1545 m_info->members[i] = F_members.members[j];
1546 m_info->num_pending = F_members.num_members - 1;
1547 m_info->num_members += 1;
1548
1549 }else if( r_info->reps[r_info->rep_index].type == RING_REP ){
1550
1551 /* Fill myself and my ring members */
1552 i = m_info->num_members;
1553 for( j=0; j < Membership.num_segments; j++ )
1554 for( k=0; k < Membership.segments[j].num_procs; k++, i++ )
1555 m_info->members[i] =
1556 Membership.segments[j].procs[k]->id;
1557 m_info->num_pending = i - m_info->num_members -1;
1558 m_info->num_members += 1;
1559 }else Alarm( EXIT, "Fill_form1: invalid rep type: %d\n",
1560 r_info->reps[r_info->rep_index].type );
1561
1562 r_info->rep_index++;
1563
1564 }else Alarm( EXIT, "Fill_form1: invalid State: %d\n",State );
1565
1566 /* update potential in case of failure */
1567 Potential_reps.num_reps = 0;
1568 for( i=0; i < r_info->num_reps; i++ )
1569 {
1570 temp_rep = r_info->reps[i];
1571 if( temp_rep.seg_index != My.seg_index )
1572 {
1573 temp_rep.type = POTENTIAL_REP;
1574 Insert_rep( &Potential_reps, temp_rep );
1575 }
1576 }
1577
1578
1579 /* update rings info */
1580 num_bytes = 0;
1581 new_num_rings = (int32 *)&rg_info_buf[num_bytes];
1582 *new_num_rings = 0;
1583 num_bytes += sizeof(int32);
1584
1585 my_rg_info = NULL;
1586 my_holes_procs_ptr = NULL; /* optimiser thinks it may be used
1587 uninitialised (its wrong, but it
1588 is too subtle for it) */
1589 for( i=0; i < *old_num_rings; i++ )
1590 {
1591 bytes_to_copy = sizeof(ring_info) +
1592 ( old_rg_info->num_holes + old_rg_info->num_commit )* sizeof(int32);
1593 if( Memb_is_equal( old_rg_info->memb_id, Membership_id ) )
1594 {
1595 my_rg_info = old_rg_info;
1596 c_ptr = (char *) old_rg_info;
1597 my_holes_procs_ptr = (int32 *)&c_ptr[sizeof(ring_info)];
1598 old_rg_info = (ring_info *)&c_ptr[bytes_to_copy];
1599 }else{
1600 new_rg_info= (ring_info *)&rg_info_buf[num_bytes];
1601 memmove((char *)new_rg_info, (char *)old_rg_info, bytes_to_copy );
1602 c_ptr = (char *) old_rg_info;
1603 old_rg_info = (ring_info *)&c_ptr[bytes_to_copy];
1604 num_bytes += bytes_to_copy;
1605 (*new_num_rings)++;
1606 }
1607 }
1608
1609 new_rg_info= (ring_info *)&rg_info_buf[num_bytes];
1610 num_bytes += sizeof(ring_info);
1611 (*new_num_rings)++;
1612
1613 new_rg_info->memb_id = Membership_id;
1614 new_rg_info->trans_time = 0;
1615 new_rg_info->num_holes = 0;
1616 new_holes_procs_ptr = (int32 *)&rg_info_buf[num_bytes];
1617
1618 new_rg_info->aru = Aru;
1619 new_rg_info->highest_seq = Highest_seq;
1620
1621 if( my_rg_info == NULL )
1622 {
1623
1624 if ( *new_num_rings > MAX_FORM_REPS )
1625 {
1626 /* This ring_info entry will NOT fit in the FORM token packet.
1627 * So since the ring_info is needed for me (this daemon) to be
1628 * included in the current membership ring, we will have
1629 * to remove ourselves from the m_info list and not
1630 * create this ring_info
1631 */
1632 Alarmp( SPLOG_WARNING, MEMB, "Fill_form1: ring_info entry for %u.%u.%u.%u will not fit in FORM token. Removing self from current membership attempt by removing IP from m_info list\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id));
1633 /* since new ring is always at the end, we just decrease current byte count */
1634 num_bytes = num_bytes - sizeof(ring_info);
1635 (*new_num_rings)--;
1636
1637 /* Remove ourselves from m_info */
1638 for ( i=0; i < m_info->num_members; i++)
1639 {
1640 if( m_info->members[i] == My.id )
1641 {
1642 num_to_copy = m_info->num_members + m_info->num_pending - i - 1;
1643 memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(int32));
1644 break;
1645 }
1646 }
1647 m_info->num_members--;
1648
1649 } else {
1650 /* New ring_info will fit, so create it */
1651 for( index = Last_discarded+1; index <= Highest_seq; index++ )
1652 {
1653 pack_entry = index & PACKET_MASK;
1654 if( ! Packets[pack_entry].exist )
1655 {
1656 *new_holes_procs_ptr = index;
1657 Alarm( MEMB , "INSERT HOLE 2 IS %d\n",index);
1658 new_holes_procs_ptr++;
1659 num_bytes += sizeof(int32);
1660 new_rg_info->num_holes++;
1661 }
1662 }
1663
1664 /* update commit-trans procs */
1665
1666 /* insert self in trans and commit */
1667 new_rg_info->num_commit = 1;
1668 new_rg_info->num_trans = 1;
1669 *new_holes_procs_ptr = My.id;
1670 new_holes_procs_ptr++;
1671 num_bytes += sizeof(int32);
1672
1673 /* insert other members of commit set */
1674 for( i=0; i < Commit_set.num_members; i++ )
1675 {
1676 /* skipping self, because already there */
1677 if( Commit_set.members[i] == My.id ) continue;
1678
1679 /* insert this member */
1680 *new_holes_procs_ptr = Commit_set.members[i];
1681 new_holes_procs_ptr++;
1682 num_bytes += sizeof(int32);
1683 new_rg_info->num_commit++;
1684 }
1685 }
1686 }else{
1687
1688 members_info temp_set;
1689
1690 if( my_rg_info->aru > Aru )
1691 new_rg_info->aru = my_rg_info->aru;
1692
1693 if( my_rg_info->highest_seq > Highest_seq )
1694 new_rg_info->highest_seq= my_rg_info->highest_seq;
1695
1696 for( i=0; i < my_rg_info->num_holes; i++ )
1697 {
1698 pack_entry = *my_holes_procs_ptr & PACKET_MASK;
1699 if( ! Packets[pack_entry].exist )
1700 {
1701 *new_holes_procs_ptr = *my_holes_procs_ptr;
1702 Alarm( MEMB ,
1703 "INSERT HOLE 3 IS %d My_aru is %d, Highest_seq is %d\n",
1704 *new_holes_procs_ptr,My_aru, Highest_seq);
1705 new_holes_procs_ptr++;
1706 num_bytes += sizeof(int32);
1707 new_rg_info->num_holes++;
1708 }
1709 my_holes_procs_ptr++;
1710 }
1711
1712 if( my_rg_info->highest_seq < Highest_seq )
1713 {
1714 for( index = my_rg_info->highest_seq+1;
1715 index <= Highest_seq; index++ )
1716 {
1717 pack_entry = index & PACKET_MASK;
1718 if( ! Packets[pack_entry].exist )
1719 {
1720 Alarm( MEMB , "INSERT HOLE 4 IS %d\n",index);
1721 *new_holes_procs_ptr = index;
1722 new_holes_procs_ptr++;
1723 num_bytes += sizeof(int32);
1724 new_rg_info->num_holes++;
1725 }
1726 }
1727 }
1728 /* setting temp_set to be trans members only */
1729 temp_set.num_members = 0;
1730 for( i = 0; i < my_rg_info->num_trans; i++ )
1731 {
1732 Insert_member( &temp_set, *my_holes_procs_ptr);
1733 my_holes_procs_ptr++;
1734 }
1735
1736 /* creating an updated temp_set based on my_rg_info and Commit_set */
1737
1738 /* adding self to trans members */
1739 Insert_member( &temp_set, My.id );
1740 if( temp_set.num_members != (my_rg_info->num_trans + 1) )
1741 Alarm( EXIT, "Fill_form1: incorrect trans set\n");
1742 temp_set.num_pending = my_rg_info->num_trans+1;
1743
1744 /* adding rest of original commit set */
1745 for( i = my_rg_info->num_trans; i < my_rg_info->num_commit; i++ )
1746 {
1747 Insert_member( &temp_set, *my_holes_procs_ptr );
1748 my_holes_procs_ptr++;
1749 }
1750
1751 /* adding my commit set */
1752 for( i = 0; i < Commit_set.num_members; i++ )
1753 {
1754 Insert_member( &temp_set, Commit_set.members[i] );
1755 }
1756
1757 /* writing my ring commit and trans information to new_rg_info */
1758 new_rg_info->num_commit = temp_set.num_members;
1759 new_rg_info->num_trans = temp_set.num_pending;
1760 for( i = 0; i < temp_set.num_members; i++ )
1761 {
1762 *new_holes_procs_ptr = temp_set.members[i];
1763 new_holes_procs_ptr++;
1764 num_bytes += sizeof(int32);
1765 }
1766 }
1767
1768 send_scat.num_elements = 4;
1769 send_scat.elements[0].buf = (char *)form_token;
1770 send_scat.elements[0].len = sizeof(token_header);
1771 send_scat.elements[1].buf = (char *)m_info;
1772 send_scat.elements[1].len = sizeof(members_info);
1773 send_scat.elements[2].buf = (char *)r_info;
1774 send_scat.elements[2].len = sizeof(reps_info);
1775 send_scat.elements[3].buf = rg_info_buf;
1776 send_scat.elements[3].len = num_bytes;
1777
1778 form_token->rtr_len = send_scat.elements[1].len + send_scat.elements[2].len +
1779 send_scat.elements[3].len;
1780
1781 /* compute whom to send to */
1782 if( m_info->num_pending > 0 )
1783 {
1784 /* send to next member in pending list */
1785 Net_ucast_token( m_info->members[m_info->num_members],
1786 &send_scat );
1787 Net_ucast_token( m_info->members[m_info->num_members],
1788 &send_scat );
1789 }else if( r_info->rep_index < r_info->num_reps){
1790 /* send to next rep */
1791 Net_ucast_token( r_info->reps[r_info->rep_index].proc_id,
1792 &send_scat );
1793 Net_ucast_token( r_info->reps[r_info->rep_index].proc_id,
1794 &send_scat );
1795 }else{
1796 /* prepare form2 token */
1797 Sort_members( m_info );
1798 m_info->num_pending = m_info->num_members;
1799 m_info->num_members = 0;
1800 form_token->type = FORM2_TYPE;
1801 /* this is the only difference between form1 and form2 tokens */
1802 send_scat.elements[2].len = sizeof(membership_id);
1803 form_token->rtr_len = send_scat.elements[1].len +
1804 send_scat.elements[2].len + send_scat.elements[3].len;
1805 Net_ucast_token( m_info->members[0], &send_scat );
1806 Net_ucast_token( m_info->members[0], &send_scat );
1807 }
1808
1809 E_dequeue( Send_join, 0, NULL );
1810 E_dequeue( Form_or_fail, 0, NULL );
1811 E_dequeue( Shift_to_seg, 0, NULL );
1812 E_queue( Memb_token_loss, 0, NULL, Form_timeout );
1813 E_dequeue( Prot_token_hurry, 0, NULL );
1814 Token_alive = 0;
1815
1816 State = FORM;
1817 GlobalStatus.state = FORM;
1818 }
1819
Read_form2(sys_scatter * scat)1820 static void Read_form2( sys_scatter *scat )
1821 {
1822 sys_scatter send_scat;
1823 token_header *form_token;
1824 members_info *m_info;
1825 membership_id *m_id_info;
1826 ring_info *rg_info;
1827 ring_info *my_rg_info;
1828 int32 *my_holes_procs_ptr;
1829 int32 *num_rings;
1830 int pack_entry;
1831 char *c_ptr;
1832 char *rings_buf;
1833 int num_bytes;
1834 int bytes_to_skip;
1835 proc p;
1836 int ret;
1837 int i;
1838 int32 memb_time = 0;
1839
1840 num_bytes = 0;
1841
1842 form_token = (token_header *)scat->elements[0].buf;
1843
1844 m_info = (members_info *)scat->elements[1].buf;
1845 num_bytes += sizeof(members_info);
1846
1847 m_id_info = (membership_id *)&scat->elements[1].buf[num_bytes];
1848 num_bytes += sizeof(membership_id);
1849
1850 rings_buf = &scat->elements[1].buf[num_bytes];
1851 num_rings = (int32 *)&scat->elements[1].buf[num_bytes];
1852 num_bytes += sizeof(int32);
1853
1854 rg_info= (ring_info *)&scat->elements[1].buf[num_bytes];
1855
1856 if( !Same_endian( form_token->type ) )
1857 {
1858 Flip_members( m_info );
1859 m_id_info->proc_id = Flip_int32( m_id_info->proc_id );
1860 m_id_info->time = Flip_int32( m_id_info->time );
1861 Flip_rings( rings_buf );
1862 }
1863
1864 form_token->proc_id = My.id;
1865
1866 /* validity check */
1867 if( m_info->members[m_info->num_members] != My.id ||
1868 m_info->num_pending <= 0 ) return;
1869
1870 m_info->num_members++;
1871 m_info->num_pending--;
1872
1873 Last_seq = form_token->seq;
1874
1875 if( m_info->num_members == 1 )
1876 {
1877 /* The time in memb_time is saved to use on one of the
1878 * Trans_memb_ids (there is one per Trans_membership view).
1879 * The time for the real membership id will be one past
1880 * the one in memb_time, because I want time to go forward.
1881 */
1882 memb_time = E_get_time().sec;
1883 if( memb_time <= Last_time_used )
1884 memb_time = Last_time_used + 1;
1885 Last_time_used = memb_time;
1886
1887 /* I am future leader, fill membership_id */
1888 m_id_info->proc_id = My.id;
1889 m_id_info->time = ++Last_time_used;
1890 }
1891
1892 /* build Future membership and Future membership id */
1893 Future_membership_id.proc_id = m_id_info->proc_id;
1894 Future_membership_id.time = m_id_info->time;
1895 Future_membership = Conf();
1896 for( i=0; i < Future_membership.num_segments; i++ )
1897 Future_membership.segments[i].num_procs = 0;
1898 for( i=0; i < (m_info->num_members + m_info->num_pending); i++ )
1899 {
1900 ret = Conf_proc_by_id( m_info->members[i], &p );
1901 if( ret < 0 ) Alarm( EXIT, "Read_form2: no such id %u\n",
1902 m_info->members[i] );
1903 if ( Conf_append_id_to_seg( &Future_membership.segments[p.seg_index], p.id) == -1)
1904 Alarm( EXIT, "Read_form2: BUG2 no such id %u\n", p.id);
1905 }
1906 Net_set_membership( Future_membership );
1907 FC_new_configuration( );
1908
1909 /* get my ring info */
1910 my_rg_info = NULL;
1911 my_holes_procs_ptr = NULL;
1912 for( i=0; i < *num_rings; i++ )
1913 {
1914 bytes_to_skip = sizeof(ring_info) +
1915 ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
1916 if( Memb_is_equal( rg_info->memb_id, Membership_id ) )
1917 {
1918 my_rg_info = rg_info;
1919 my_holes_procs_ptr =
1920 (int32 *)&scat->elements[1].buf[num_bytes+sizeof(ring_info)];
1921 }
1922 c_ptr = (char *) rg_info;
1923 rg_info = (ring_info *)&c_ptr[bytes_to_skip];
1924 num_bytes += bytes_to_skip;
1925 }
1926
1927 if (my_rg_info == NULL) {
1928 Alarm(EXIT, "Read_form2: num_rings = %d, num_bytes = %d, Memb_id = (%d %d)\n",
1929 *num_rings, num_bytes, Membership_id.proc_id, Membership_id.time);
1930 }
1931
1932 Highest_seq = my_rg_info->highest_seq;
1933 Aru = my_rg_info->aru;
1934 /* Note: this call to Discard_packets handles delivery of all the messages
1935 * from the old membership with sequence numbers prior to the old Aru.
1936 */
1937 Discard_packets();
1938
1939 for( i=0; i < my_rg_info->num_holes; i++ )
1940 {
1941 /* create dummy messages */
1942 pack_entry = *my_holes_procs_ptr & PACKET_MASK;
1943 Alarm( MEMB , "EXTRACT HOLE IS %d\n",*my_holes_procs_ptr);
1944 if( Packets[pack_entry].exist != 0 )
1945 Alarm( EXIT, "Read_form2: seq %d should be a hole, but is %d\n",
1946 *my_holes_procs_ptr, Packets[pack_entry].exist );
1947 Packets[pack_entry].exist = 3;
1948 my_holes_procs_ptr++;
1949 }
1950
1951 /* extract future commit set (and future trans membership) */
1952 Future_commit_set.num_members = my_rg_info->num_commit;
1953 Future_commit_set.num_pending = my_rg_info->num_trans;
1954 for( i=0; i < my_rg_info->num_commit; i++ )
1955 {
1956 Future_commit_set.members[i] = *my_holes_procs_ptr;
1957 my_holes_procs_ptr++;
1958 }
1959
1960 /* The token circulates in conf order, which also defines the order
1961 * by which we choose "leaders." So, if noone else has set the id
1962 * for my ring, I get to, and I'll be leader. */
1963 if( !my_rg_info->trans_time )
1964 {
1965 /* memb_time could be already set, if this daemon is the
1966 * future leader */
1967 if( !memb_time )
1968 {
1969 memb_time = E_get_time().sec;
1970 if( memb_time <= Last_time_used )
1971 memb_time = Last_time_used + 1;
1972 Last_time_used = memb_time;
1973 }
1974 my_rg_info->trans_time = memb_time;
1975 }
1976 F_trans_memb_time = my_rg_info->trans_time;
1977
1978 send_scat.num_elements = 2;
1979 send_scat.elements[0].buf = scat->elements[0].buf;
1980 send_scat.elements[0].len = sizeof(token_header);
1981 send_scat.elements[1].buf = scat->elements[1].buf;
1982 send_scat.elements[1].len = num_bytes;
1983
1984 if( Conf_last( &Future_membership ) != My.id )
1985 {
1986 Net_send_token( &send_scat );
1987 Net_send_token( &send_scat );
1988 Token_rounds = 0;
1989
1990 }else{
1991 /* build first regular token */
1992 send_scat.num_elements = 1;
1993
1994 form_token->type = 0;
1995 form_token->seq = 0;
1996 form_token->aru = Last_seq;
1997 form_token->flow_control = 0;
1998 form_token->rtr_len = 0;
1999
2000 Net_send_token( &send_scat );
2001 Token_rounds = 1;
2002 }
2003 Token_alive = 1;
2004 E_queue( Memb_token_loss, 0, NULL, Token_timeout );
2005
2006 Last_token->type = 0;
2007 Last_token->seq = 0;
2008 Last_token->aru = 0;
2009
2010 State = EVS;
2011 GlobalStatus.state = EVS;
2012 }
2013
Memb_print_form_token(sys_scatter * scat)2014 void Memb_print_form_token( sys_scatter *scat )
2015 {
2016 token_header *form_token;
2017 members_info *m_info;
2018 reps_info *r_info = NULL; /* avoids compile warning -- gcc not detect initialization */
2019 membership_id *m_id_info = NULL; /* avoids compile warning -- gcc not detect initialization */
2020 ring_info *rg_info;
2021 int32 *num_rings;
2022 int32 *commit_id;
2023 char *c_ptr;
2024 int num_bytes, bytes_to_skip;
2025 int i,j, scat_index;
2026 int is_form1 = 0;
2027 char form_name[10];
2028
2029 num_bytes = 0;
2030 scat_index = 1;
2031 form_token = (token_header *)scat->elements[0].buf;
2032
2033 m_info = (members_info *)&scat->elements[scat_index].buf[num_bytes];
2034 num_bytes += sizeof(members_info);
2035
2036 if (num_bytes == scat->elements[scat_index].len )
2037 {
2038 num_bytes = 0;
2039 scat_index++;
2040 }
2041
2042 if ( Is_form1( form_token->type ) )
2043 {
2044 r_info = (reps_info *)&scat->elements[scat_index].buf[num_bytes];
2045 num_bytes += sizeof(reps_info);
2046 is_form1 = 1;
2047 } else if( Is_form2( form_token->type ) )
2048 {
2049 m_id_info = (membership_id *)&scat->elements[scat_index].buf[num_bytes];
2050 num_bytes += sizeof(membership_id);
2051 is_form1 = 0;
2052 } else {
2053 Alarm( EXIT, "Invalid token type received: 0x%x\n", form_token->type);
2054 return;
2055 }
2056
2057 if (num_bytes == scat->elements[scat_index].len )
2058 {
2059 num_bytes = 0;
2060 scat_index++;
2061 }
2062
2063 num_rings = (int32 *)&scat->elements[scat_index].buf[num_bytes];
2064 num_bytes += sizeof(int32);
2065
2066 rg_info= (ring_info *)&scat->elements[scat_index].buf[num_bytes];
2067
2068 /* Print form_token_header */
2069
2070 Alarmp( SPLOG_PRINT, PRINT, "=========== Form Token ==========\n");
2071
2072 if ( is_form1 )
2073 snprintf(&form_name[0], 10, "FORM 1");
2074 else
2075 snprintf(&form_name[0], 10, "FORM 2");
2076
2077 Alarmp( SPLOG_PRINT, PRINT, "%s Token, sent by %u.%u.%u.%u. Seq: %d\n", form_name, IP1(form_token->transmiter_id), IP2(form_token->transmiter_id), IP3(form_token->transmiter_id), IP4(form_token->transmiter_id), form_token->seq);
2078 Alarmp( SPLOG_PRINT, PRINT, "ProcID: %u.%u.%u.%u\t ARU: %d, ARU LastID: %u.%u.%u.%u\n", IP1(form_token->proc_id), IP2(form_token->proc_id), IP3(form_token->proc_id), IP4(form_token->proc_id), form_token->aru, IP1(form_token->aru_last_id), IP2(form_token->aru_last_id), IP3(form_token->aru_last_id), IP4(form_token->aru_last_id) );
2079 Alarmp( SPLOG_PRINT, PRINT, "FlowControl: %hd\tRTR Len: %hd\n", form_token->flow_control, form_token->rtr_len);
2080 /* Print members list */
2081
2082 Alarmp( SPLOG_PRINT, PRINT, "Form Token members list -- Active (%hd) Pending (%hd)\n", m_info->num_members, m_info->num_pending);
2083 for (i=0; i < m_info->num_members; i++)
2084 {
2085 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
2086 if ( (i % 3) == 2 )
2087 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2088 }
2089
2090 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\nPending Members:\n");
2091
2092 for (i= m_info->num_members; i < ( m_info->num_members + m_info->num_pending); i++)
2093 {
2094 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
2095 if ( (i % 3) == 2 )
2096 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2097 }
2098 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2099
2100 if ( is_form1 )
2101 {
2102 /* Print reps list */
2103 Alarmp( SPLOG_PRINT, PRINT, "Form Token reps list -- Count (%hd) index (%hd)\n", r_info->num_reps, r_info->rep_index);
2104 for (i=0; i < r_info->num_reps; i++)
2105 {
2106 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u (T %hd SegInd %hd) ", i, IP1(r_info->reps[i].proc_id), IP2(r_info->reps[i].proc_id), IP3(r_info->reps[i].proc_id), IP4(r_info->reps[i].proc_id), r_info->reps[i].type, r_info->reps[i].seg_index );
2107 if ( (i % 3) == 2 )
2108 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2109 }
2110 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2111 } else /* so is FORM2 type */
2112 {
2113 Alarmp( SPLOG_PRINT, PRINT, "Form Token Membership ID %u.%u.%u.%u : %d\n", IP1(m_id_info->proc_id), IP2(m_id_info->proc_id), IP3(m_id_info->proc_id), IP4(m_id_info->proc_id), m_id_info->time );
2114 }
2115
2116 /* Print ring info */
2117 Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list -- Count (%d)\n", *num_rings);
2118 for (i=0; i < *num_rings; i++)
2119 {
2120 bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
2121 c_ptr = (char *) rg_info;
2122
2123 Alarmp( SPLOG_PRINT, PRINT, "Ring %u: MembID %u.%u.%u.%u - %u\tTransTime %u\n", i, IP1(rg_info->memb_id.proc_id), IP2(rg_info->memb_id.proc_id), IP3(rg_info->memb_id.proc_id), IP4(rg_info->memb_id.proc_id), rg_info->memb_id.time, rg_info->trans_time);
2124 Alarmp( SPLOG_PRINT, PRINT, "\tARU: %d\tHighSeq: %d\tNumHoles: %d\n", rg_info->aru, rg_info->highest_seq, rg_info->num_holes);
2125 Alarmp( SPLOG_PRINT, PRINT, "\tNumCommit: %hd\tNumTrans: %hd\n", rg_info->num_commit, rg_info->num_trans);
2126 /* Now print all missing messages from this ring (holes) */
2127 commit_id = (int32 *) &c_ptr[sizeof(ring_info)];
2128
2129 Alarmp( SPLOG_PRINT, PRINT, "\tMessage Holes:");
2130 for (j=0; j < rg_info->num_holes; j++)
2131 {
2132 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u ", *commit_id);
2133 commit_id++;
2134 }
2135 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2136
2137 /* Now print transitional member list */
2138 Alarmp( SPLOG_PRINT, PRINT, "\tTrans List:");
2139 for (j=0; j < rg_info->num_trans; j++)
2140 {
2141 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
2142 if ( (j % 3) == 2 )
2143 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2144 commit_id++;
2145 }
2146 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2147
2148 /* Now print commit list. This follows the trans list with no gaps. */
2149 Alarmp( SPLOG_PRINT, PRINT, "\tCommit List:");
2150 for (j=rg_info->num_trans; j < rg_info->num_commit; j++)
2151 {
2152 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
2153 if ( (j % 3) == 2 )
2154 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2155 commit_id++;
2156 }
2157 Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
2158
2159 /* next ring */
2160 rg_info = (ring_info *)&c_ptr[bytes_to_skip];
2161 }
2162
2163 Alarmp( SPLOG_PRINT, PRINT, "====================================================\n");
2164 }
2165
Backoff_membership()2166 static void Backoff_membership()
2167 {
2168 int pack_entry;
2169 int i;
2170
2171 pack_entry=-1;
2172 for( i=Last_discarded+1; i <= Highest_seq; i++ )
2173 {
2174 /* clear dummy messages */
2175 pack_entry = i & PACKET_MASK;
2176 if( Packets[pack_entry].exist == 3 )
2177 Packets[pack_entry].exist = 0;
2178 }
2179
2180 /* return Aru and My_aru */
2181 Aru = Last_discarded;
2182
2183 My_aru = Last_discarded;
2184 for( i=Last_discarded+1; i <= Highest_seq; i++ )
2185 {
2186 if( !Packets[pack_entry].exist ) break;
2187 My_aru++;
2188 }
2189 }
2190
Memb_commit()2191 void Memb_commit()
2192 {
2193 Commit_set = Future_commit_set;
2194 }
2195
Memb_transitional()2196 void Memb_transitional()
2197 {
2198 int i, j, k;
2199 int32u proc_id;
2200
2201 Alarm( MEMB, "Memb_transitional\n");
2202
2203 Transitional = 1;
2204 Trans_membership.num_segments = Cn.num_segments;
2205 for( i=0; i < Cn.num_segments; i++ )
2206 {
2207 Trans_membership.segments[i].num_procs = 0;
2208 for( j=0; j < Cn.segments[i].num_procs; j++ )
2209 {
2210 proc_id = Cn.segments[i].procs[j]->id;
2211 for( k=0; k < Commit_set.num_pending; k++ )
2212 {
2213 if( Commit_set.members[k] == proc_id )
2214 {
2215 if ( Conf_append_id_to_seg( &Trans_membership.segments[i], proc_id) == -1)
2216 Alarm(EXIT, "Memb_transitional: Commit_set has member %u for trans who doesn't exist\n", proc_id);
2217 break;
2218 }
2219 }
2220 }
2221 }
2222
2223 Trans_memb_id.proc_id = Conf_leader( &Trans_membership );
2224 Trans_memb_id.time = F_trans_memb_time;
2225
2226 Commit_membership.num_segments = Cn.num_segments;
2227 for( i=0; i < Cn.num_segments; i++ )
2228 {
2229 Commit_membership.segments[i].num_procs = 0;
2230 for( j=0; j < Cn.segments[i].num_procs; j++ )
2231 {
2232 proc_id = Cn.segments[i].procs[j]->id;
2233 for( k=0; k < Commit_set.num_members; k++ )
2234 {
2235 if( Commit_set.members[k] == proc_id )
2236 {
2237 if ( Conf_append_id_to_seg(&Commit_membership.segments[i], proc_id) == -1)
2238 Alarm(EXIT, "Memb_transitional: Commit_set has member %u who doesn't exist\n", proc_id);
2239 break;
2240 }
2241 }
2242 }
2243 }
2244 }
2245
Memb_regular()2246 void Memb_regular()
2247 {
2248 int i;
2249
2250 Alarm( MEMB, "Memb_regular\n");
2251 Transitional = 0;
2252 State = OP;
2253 GlobalStatus.state = OP;
2254 GlobalStatus.membership_changes++;
2255 Membership = Future_membership;
2256 Membership_id = Future_membership_id;
2257 Reg_membership = Membership;
2258
2259 GlobalStatus.num_procs = 0;
2260 GlobalStatus.num_segments = 0;
2261 for( i=0; i < Membership.num_segments; i++ )
2262 {
2263 if( Membership.segments[i].num_procs > 0 )
2264 {
2265 GlobalStatus.num_procs += Membership.segments[i].num_procs;
2266 GlobalStatus.num_segments++;
2267 }
2268 }
2269 GlobalStatus.leader_id = Membership_id.proc_id;
2270
2271 Foreign_found = 0;
2272 if( Conf_leader( &Membership ) == My.id )
2273 E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
2274 printf("Membership id is ( %d, %d)\n", Membership_id.proc_id, Membership_id.time );
2275 printf("%c", Conf_print( &Membership ) );
2276 }
2277
Flip_members(members_info * members_ptr)2278 void Flip_members( members_info *members_ptr )
2279 {
2280 /*
2281 * This routine can not be called twice beacuse of num_members and num_pending
2282 */
2283 int i;
2284
2285 members_ptr->num_members = Flip_int16( members_ptr->num_members );
2286 members_ptr->num_pending = Flip_int16( members_ptr->num_pending );
2287 for( i=0; i < members_ptr->num_members + members_ptr->num_pending; i++ )
2288 members_ptr->members[i] = Flip_int32( members_ptr->members[i] );
2289 }
2290
Flip_reps(reps_info * reps_ptr)2291 void Flip_reps( reps_info *reps_ptr )
2292 {
2293 /*
2294 * This routine can not be called twice beacuse of num_reps
2295 */
2296 int i;
2297
2298 reps_ptr->num_reps = Flip_int16( reps_ptr->num_reps );
2299 reps_ptr->rep_index = Flip_int16( reps_ptr->rep_index );
2300 for( i=0; i < reps_ptr->num_reps; i++ )
2301 {
2302 reps_ptr->reps[i].proc_id = Flip_int32( reps_ptr->reps[i].proc_id );
2303 reps_ptr->reps[i].type = Flip_int16( reps_ptr->reps[i].type );
2304 reps_ptr->reps[i].seg_index = Flip_int16( reps_ptr->reps[i].seg_index );
2305 }
2306 }
2307
Flip_rings(char * buf)2308 void Flip_rings( char *buf )
2309 {
2310 /*
2311 * This routine can not be called twice beacuse of *num_rings
2312 * and of ring_info_ptr->num_holes
2313 */
2314 ring_info *ring_info_ptr;
2315 int32 *num_rings;
2316 int ptr;
2317 char *c_ptr;
2318 int32 *seq_or_proc;
2319 int i,j;
2320
2321 c_ptr = buf;
2322 ptr = 0;
2323 num_rings = (int32 *)&c_ptr[ptr];
2324
2325 *num_rings = Flip_int32( *num_rings );
2326 ptr += sizeof(int32);
2327
2328 for( i=0; i < *num_rings; i++ )
2329 {
2330 ring_info_ptr = (ring_info *)&c_ptr[ptr];
2331
2332 ring_info_ptr->memb_id.proc_id = Flip_int32( ring_info_ptr->memb_id.proc_id );
2333 ring_info_ptr->memb_id.time = Flip_int32( ring_info_ptr->memb_id.time );
2334 ring_info_ptr->trans_time = Flip_int32( ring_info_ptr->trans_time );
2335 ring_info_ptr->aru = Flip_int32( ring_info_ptr->aru );
2336 ring_info_ptr->highest_seq = Flip_int32( ring_info_ptr->highest_seq );
2337 ring_info_ptr->num_holes = Flip_int32( ring_info_ptr->num_holes );
2338 ring_info_ptr->num_commit = Flip_int16( ring_info_ptr->num_commit );
2339 ring_info_ptr->num_trans = Flip_int16( ring_info_ptr->num_trans );
2340
2341 ptr += sizeof(ring_info);
2342
2343 for( j=0; j < ( ring_info_ptr->num_holes + ring_info_ptr->num_commit ); j++ )
2344 {
2345 seq_or_proc = (int32 *)&c_ptr[ptr];
2346 *seq_or_proc = Flip_int32( *seq_or_proc );
2347 ptr += sizeof(int32);
2348 }
2349 }
2350 }
2351