1 /*
2 * The Spread Toolkit.
3 *
4 * The contents of this file are subject to the Spread Open-Source
5 * License, Version 1.0 (the ``License''); you may not use
6 * this file except in compliance with the License. You may obtain a
7 * copy of the License at:
8 *
9 * http://www.spread.org/license/
10 *
11 * or in the file ``license.txt'' found in this distribution.
12 *
13 * Software distributed under the License is distributed on an AS IS basis,
14 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15 * for the specific language governing rights and limitations under the
16 * License.
17 *
18 * The Creators of Spread are:
19 * Yair Amir, Michal Miskin-Amir, Jonathan Stanton, John Schultz.
20 *
21 * Copyright (C) 1993-2012 Spread Concepts LLC <info@spreadconcepts.com>
22 *
23 * All Rights Reserved.
24 *
25 * Major Contributor(s):
26 * ---------------
27 * Ryan Caudy rcaudy@gmail.com - contributions to process groups.
28 * Claudiu Danilov claudiu@acm.org - scalable wide area support.
29 * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
30 * Theo Schlossnagle jesus@omniti.com - Perl, autoconf, old skiplist.
31 * Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
32 *
33 */
34
35 #include "arch.h"
36 #include <string.h>
37 #include <stdio.h>
38 #include <assert.h>
39
40 #include "spread_params.h"
41 #include "net_types.h"
42 #include "protocol.h"
43 #include "sess_types.h"
44 #include "sess_body.h"
45 #include "groups.h"
46 #include "spu_objects.h"
47 #include "spu_memory.h"
48 #include "spu_events.h"
49 #include "status.h"
50 #include "spu_alarm.h"
51 #include "membership.h"
52 #if ( SPREAD_PROTOCOL > 3 )
53 #include "queues.h"
54 #endif
55 #include "message.h"
56
57 #ifndef NULL
58 #define NULL 0
59 #endif /* NULL */
60
61 /* An unknown membership id (on a daemon_members struct), defined as having a proc_id of -1,
62 * means that the daemon is "partitioned." "Established" is the complement.
63 * Since -1 == 255.255.255.255 is the universal broadcast address, this can never
64 * conflict with a *real* proc_id. */
65 #define Is_unknown_memb_id( midp ) ( (midp)->proc_id == -1 )
66
67 #define Is_established_daemon( dp ) ( !Is_unknown_memb_id( &((dp)->memb_id) ) )
68 #define Is_partitioned_daemon( dp ) ( Is_unknown_memb_id( &((dp)->memb_id) ) )
69
70 /* Flag values - working with a pointer to char, set it 0x01 or test if its 0x01 */
71 #define Set_first_message( cp ) ( (*cp) = 0x01 )
72 #define Set_later_message( cp ) ( (*cp) = 0x00 )
73 #define Is_first_message( cp ) ( (*cp) == 0x01 )
74 #define Is_later_message( cp ) ( (*cp) == 0x00 )
75
76 /* IP should be a 32-bit integer, and
77 * STR should be a character array of size at least 16. */
78 #define IP_to_STR( IP, STR ) snprintf( STR, 16, "%d.%d.%d.%d", \
79 IP1(IP), IP2(IP), IP3(IP), IP4(IP) )
80
81 /* The representative of a synced set is the first member.
82 * The set is sorted by daemon order in conf. */
83 #define Is_synced_set_leader( proc_id ) ( (proc_id) == MySyncedSet.proc_ids[0] )
84
85 typedef struct dummy_groups_buf_link {
86 char buf[GROUPS_BUF_SIZE];
87 int bytes;
88 struct dummy_groups_buf_link *next;
89 } groups_buf_link;
90
91 typedef struct dummy_synced_set {
92 int32u size;
93 int32 proc_ids[MAX_PROCS_RING];
94 } synced_set;
95
96 /* This message link struct enables the messages from a given synced
97 * set to be kept together. */
98 typedef struct dummy_groups_message_link {
99 int32 rep_proc_id;
100 int complete;
101 message_link *first;
102 struct dummy_groups_message_link *next;
103 } groups_message_link;
104
printgroup(void * vgrp)105 char *printgroup(void *vgrp) {
106 group *grp = (group *)vgrp;
107 return grp->name;
108 }
109
110 static configuration Cn_active;
111 static configuration Cn_interim;
112 static int Gstate;
113 static configuration Trans_memb;
114 static membership_id Trans_memb_id;
115 static configuration Reg_memb;
116 static membership_id Reg_memb_id;
117 static char Mess_buf[MAX_MESSAGE_BODY_LEN];
118
119 static groups_buf_link *Groups_bufs;
120 static int Groups_bufs_fresh;
121 static int Num_mess_gathered;
122 static int Num_daemons_gathered;
123 static groups_message_link Gathered; /* Groups messages */
124
125 static int Groups_control_down_queue;
126
127 static stdskl GroupsList; /* (group*) -> nil */
128 /* TODO: might want to add a "secondary" index of daemon IDs -> groups for potentially faster performance */
129 /* TODO: might want to add a "secondary" index of member IDs -> groups for potentially faster performance */
130 static synced_set MySyncedSet;
131 static membership_id unknown_memb_id = { -1, -1 }; /* See explanation above. */
132
133 /* Unused function
134 *static int G_id_is_equal( group_id g1, group_id g2 );
135 */
136 static void G_print_group_id( int priority, group_id g, char *func_name );
137 static group *G_get_group( char *group_name );
138 static daemon_members *G_get_daemon( group *grp, int32u proc_id );
139 static member *G_get_member( daemon_members *dmn, char *private_group_name );
140
141 static void G_send_lightweight_memb( group *grp, int32 caused, char *private_group_name );
142 static void G_send_self_leave( group *grp, int ses );
143 static void G_send_heavyweight_memb( group *grp );
144 static void G_send_heavyweight_join( group *grp, member *joiner, mailbox new_mbox );
145 static void G_send_trans_memb( group *grp );
146 static void G_compute_group_mask( group *grp, char *func_name );
147
148 static int G_build_memb_buf( group *grp, message_obj *msg,
149 char buf[], int32 caused );
150 static int G_build_memb_vs_buf( group *grp, message_obj *msg,
151 char buf[], int32 caused, member *joiner );
152 static message_link *G_build_trans_mess( group *grp );
153
154 static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes );
155 static int G_build_groups_buf( char buf[], stdit *git, stdit *dit, int first_time );
156 static void G_build_new_groups_bufs(void);
157 static int G_send_groups_messages(void);
158 static void G_stamp_groups_bufs(void);
159 static void G_discard_groups_bufs(void);
160 static int G_mess_to_groups( message_link *mess_link, synced_set *sset );
161 static void G_compute_and_notify(void);
162 static void G_print(void);
163 static void G_update_daemon_memb_ids( group *grp );
164
165 static void G_add_to_synced_set( synced_set *s );
166 static void G_update_synced_set( synced_set *s, configuration *m );
167 static bool G_update_synced_set_status( synced_set *s, configuration *m );
168 static void G_print_synced_set( int priority, synced_set *s, char *func_name );
169
170 static void G_eliminate_partitioned_daemons( group *grp );
171 static bool G_eliminate_partitioned_daemons_status( group *grp );
172 static bool G_check_if_changed_by_cascade( group *grp );
173 static void G_remove_daemon( group *grp, daemon_members *dmn );
174 static void G_remove_group( group *grp );
175 static void G_remove_mailbox( group *grp, mailbox m );
176
177 static int G_compare_nameptr(const void *, const void *);
178 static int G_compare_proc_ids_by_conf( const void *, const void * );
179 static int G_compare_proc_ids_by_conf_internal( configuration *config, const void *a, const void *b);
180 static int G_compare_daemon_vs_set( const void *, const void * );
181
182 static int G_compare_memb_ids( const membership_id *mid1,
183 const membership_id *mid2 );
184
185 static void G_shift_to_GOP( void );
186
G_compare_nameptr(const void * a,const void * b)187 static int G_compare_nameptr(const void *a, const void *b)
188 {
189 return strncmp(*(const char**) a, *(const char**) b, MAX_GROUP_NAME);
190 }
191
G_compare_proc_ids_by_conf(const void * a,const void * b)192 static int G_compare_proc_ids_by_conf(const void *a, const void *b)
193 {
194 return G_compare_proc_ids_by_conf_internal( &Cn_active, a, b);
195 }
196
G_compare_proc_ids_by_conf_interim(const void * a,const void * b)197 static int G_compare_proc_ids_by_conf_interim(const void *a, const void *b)
198 {
199 return G_compare_proc_ids_by_conf_internal( &Cn_interim, a, b);
200 }
201
202 /* Compare two procs based on the specified configuration structure
203 * The ordering is defined by the order the procs occur in the configuration.
204 * If a proc is no longer in the active configuration, it is ordered after all active procs
205 * based on it's IP address.
206 */
G_compare_proc_ids_by_conf_internal(configuration * config,const void * a,const void * b)207 static int G_compare_proc_ids_by_conf_internal( configuration *config, const void *a, const void *b)
208 {
209 proc dummy_proc;
210 const int32u aip = **(const int32u**)a;
211 const int32u bip = **(const int32u**)b;
212
213 int ia = Conf_proc_by_id_in_conf( config, aip, &dummy_proc );
214 int ib = Conf_proc_by_id_in_conf( config, bip, &dummy_proc );
215
216 /* common case */
217 if (ia > -1 && ib > -1) {
218 /* both are active */
219 return ia - ib;
220 }
221 if (ia < 0 && ib < 0 ) {
222 /* both not active so compare IP addresses */
223 if (aip == bip)
224 return 0;
225 else
226 if ( aip < bip )
227 return -1;
228 else
229 return 1;
230
231 } else
232 if (ia < 0)
233 return 1;
234 else
235 if (ib < 0)
236 return -1;
237 else
238 /* dup rule for all are active to keep compiler happy */
239 return ia - ib;
240 }
241
G_compare_daemon_vs_set(const void * a,const void * b)242 static int G_compare_daemon_vs_set(const void *a, const void *b)
243 {
244 const daemon_members *da = *(const daemon_members**) a;
245 const daemon_members *db = *(const daemon_members**) b;
246 int cmp = G_compare_memb_ids(&da->memb_id, &db->memb_id);
247
248 if ( cmp == 0 ) {
249 const int32 * da_proc_id_ptr = &da->proc_id;
250 const int32 * db_proc_id_ptr = &db->proc_id;
251
252 cmp = G_compare_proc_ids_by_conf( &da_proc_id_ptr, &db_proc_id_ptr );
253 }
254
255 return cmp;
256 }
257
258 /* Compares two memb_ids. Arbitrary, but deterministic. A memb_id with a proc_id
259 * of -1 compares after ANY other memb_id, by definition. */
G_compare_memb_ids(const membership_id * mid1,const membership_id * mid2)260 static int G_compare_memb_ids( const membership_id *mid1, const membership_id *mid2 )
261 {
262 int unknown_ids = 0;
263 if( Is_unknown_memb_id( mid1 ) )
264 unknown_ids += 1;
265 if( Is_unknown_memb_id( mid2 ) )
266 unknown_ids += 2;
267 switch( unknown_ids ) {
268 case 0: break;
269 case 1: return 1;
270 case 2: return -1;
271 case 3: return 0;
272 }
273 if( mid1->proc_id != mid2->proc_id )
274 return mid1->proc_id - mid2->proc_id;
275 if( mid1->time != mid2->time )
276 return mid1->time - mid2->time;
277 return 0;
278 }
279
G_shift_to_GOP(void)280 static void G_shift_to_GOP( void )
281 {
282 Gstate = GOP;
283 GlobalStatus.gstate = Gstate;
284 }
285
G_init()286 void G_init()
287 {
288 int ret;
289
290 Alarmp( SPLOG_INFO, GROUPS, "G_init: \n" );
291
292 Num_groups = 0;
293 GlobalStatus.num_groups = Num_groups;
294
295 MySyncedSet.size = 1;
296 MySyncedSet.proc_ids[0] = My.id;
297 G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_init" );
298
299 Cn_active.allprocs = Mem_alloc( MAX_PROCS_RING * sizeof( proc ) );
300 if (Cn_active.allprocs == NULL) {
301 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failed to allocate memory for Cn_active procs array\n");
302 }
303
304 ret = stdskl_construct(&GroupsList, sizeof(group*), 0, G_compare_nameptr);
305 if (ret != 0) {
306 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GroupsList\n");
307 }
308 ret = Mem_init_object(GROUP, "group", sizeof(group), 1000, 0);
309 if (ret < 0)
310 {
311 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GROUP memory objects\n");
312 }
313 ret = Mem_init_object(DAEMON_MEMBERS, "daemon_members", sizeof(daemon_members), 2000, 0);
314 if (ret < 0)
315 {
316 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize DAEMON_MEMBERS memory objects\n");
317 }
318 ret = Mem_init_object(MEMBER, "member", sizeof(member), 10000, 0);
319 if (ret < 0)
320 {
321 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize MEMBER memory objects\n");
322 }
323 ret = Mem_init_object(GROUPS_BUF_LINK, "groups_buf_link", sizeof(groups_buf_link), 1, 1);
324 if( ret < 0 )
325 {
326 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n");
327 }
328 ret = Mem_init_object(GROUPS_MESSAGE_LINK, "groups_message_link", sizeof(groups_message_link), 1, 1);
329 if( ret < 0 )
330 {
331 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GROUPS_MESSAGE_LINK memory objects\n");
332 }
333
334 #if ( SPREAD_PROTOCOL == 3 )
335 Groups_control_down_queue = 0;
336 #else
337 Groups_control_down_queue = init_queuesess(Groups_down_qs);
338 #endif
339
340 Groups_bufs = NULL;
341 Groups_bufs_fresh = 0;
342 Num_mess_gathered = 0;
343 Num_daemons_gathered = 0;
344 Gathered.complete = 0;
345 Gathered.next = NULL;
346
347 Conf_config_copy( Conf_ref(), &Cn_active );
348
349 G_shift_to_GOP();
350 }
351
352
353 /*
354 * Called from Prot_initiate_conf_reload after configuration file is reloaded (potentially with changes to spread configuration)
355 * Needs to update any static-scope variables that depend on current configuration
356 *
357 Algorithm to clean up existing DaemonList skiplists that are stored for every group.
358 Each skiplist needs to be reformed with a different comparison function using the new Config structure.
359 Once they are all reformed, we need to move the temporary daemon lists back into the main GroupsList structure entries.
360
361 Steps:
362 1)
363 make new grp skiplist to store temporary copies of all of the daemon lists.
364 loop over all grps:
365 copy damon list from current skiplist to new skiplist with comparison function (G_compare_proc_ids_by_conf_interim) that uses the new config structure. This maintains the daemon_members in each entry, but inserts them into a new skiplist in a new order based on the new configuration.
366 store new skiplist in the new grp list so we can store all of them with new structures before destroying them all and switching the active Conf
367 end
368 2)
369 loop over all grps:
370 destruct the current daemonlist for all groups
371 end
372 3)
373 Set Config variable to point to new config (old config no longer available)
374 4)
375 loop over all grps:
376 create new daemon skiplsit for each group with normal comparison function (will effectively use new config now)
377 get begin and end iterators for temp daemon skiplist with stdstk_begin() and stdskl_end()
378 call stdstk_insert_seq() to insert contents of temp skiplist into new permantent skiplist
379 destrcut temp skiplist
380 end
381 destruct temp grp skiplist.
382
383 At the end of this function (which runs atomically with regards to any other groups functions):
384 - all of the skiplists with daemons in them will be valid with the new Conf structure and can find the 'old' daemons.
385 - There are no changes to the handling of the groups membership code that runs when the ring is reformed as it can correctly remove the daemons when it thinks it should beauase they are correctly indexed and sorted in the skiplist.
386
387 */
388
G_signal_conf_reload(void)389 void G_signal_conf_reload(void)
390 {
391 int ret;
392 group *grp, *tmp_grp;
393 daemon_members *dmn;
394 stdit git, dit;
395 stdskl tmp_GroupsList;
396
397 ret = stdskl_construct(&tmp_GroupsList, sizeof(group*), 0, G_compare_nameptr);
398 if (ret != 0) {
399 Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GroupsList\n");
400 }
401
402 /*
403 0) Get copy of new configuration to use while we still have old config also.
404 A shallow copy is fine as:
405 - we want the current Conf(),
406 - do not need our own copy of allprocs,
407 - and only need it valid for this function call.
408 */
409
410 Cn_interim = Conf();
411
412 /*
413 1)
414 loop over all grps:
415 copy damon list from current skiplist to new skiplist with new config as comparison fucntion
416 store new skiplist in the new grp list.
417 end
418 */
419 for (stdskl_begin(&GroupsList, &git); !stdskl_is_end(&GroupsList, &git); )
420 {
421 grp = *(group**) stdskl_it_key(&git);
422 stdskl_it_next(&git); /* NOTE: need to do advancement before potential erasure below */
423
424 tmp_grp = new( GROUP );
425 memset( tmp_grp->name, 0, MAX_GROUP_NAME );
426 strcpy( tmp_grp->name, grp->name );
427
428 if (stdskl_construct(&tmp_grp->DaemonsList, sizeof(daemon_members*), 0, G_compare_proc_ids_by_conf_interim) != 0) {
429 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
430 }
431 tmp_grp->changed = FALSE;
432 tmp_grp->num_members = 0;
433 tmp_grp->grp_id = grp->grp_id;
434
435 if (stdskl_put(&tmp_GroupsList, NULL, &tmp_grp, NULL, STDFALSE) != 0) {
436 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
437 }
438
439 for (stdskl_begin(&grp->DaemonsList, &dit); !stdskl_is_end(&grp->DaemonsList, &dit); )
440 {
441 dmn = *(daemon_members**) stdskl_it_key(&dit);
442 stdskl_it_next(&dit); /* NOTE: need to do advancement before potential erasure below */
443
444 /* insert this dmn into the new skiplist */
445 if (stdskl_put(&tmp_grp->DaemonsList, NULL, &dmn, NULL, STDFALSE) != 0) {
446 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
447 }
448 }
449
450 }
451 /*
452 2)
453 loop over all grps:
454 destruct the current daemonlist
455 end
456 */
457 for (stdskl_begin(&GroupsList, &git); !stdskl_is_end(&GroupsList, &git); )
458 {
459 grp = *(group**) stdskl_it_key(&git);
460 stdskl_it_next(&git); /* NOTE: need to do advancement before potential erasure below */
461
462 stdskl_destruct( &grp->DaemonsList);
463 }
464
465 /* 3)
466 Set Cn_active variable to point to new config (old config no longer available)
467 This has to be a deep copy because it must preserve all of the procs array when the configuration is next reloaded
468 and the configuration.c version of Conf changes.
469 */
470 Conf_config_copy( Conf_ref(), &Cn_active );
471
472 /* 4)
473 loop over all grps:
474 create new daemon skiplsit for each group with normal comparison function (will effectively use new config now)
475 get begin and end iterators for temp daemon skiplist with stdstk_begin() and stdskl_end()
476 call stdstk_insert_seq() to insert contents of temp skiplist into new permantent skiplist
477 destruct temp skiplist
478 free temp grp structure
479 end
480 destruct temp grp skiplist.
481 */
482
483 for (stdskl_begin(&GroupsList, &git); !stdskl_is_end(&GroupsList, &git); )
484 {
485 stdit bskl, eskl, it;
486
487 grp = *(group**) stdskl_it_key(&git);
488 stdskl_it_next(&git); /* NOTE: need to do advancement before potential erasure below */
489
490 if (stdskl_construct(&grp->DaemonsList, sizeof(daemon_members*), 0, G_compare_proc_ids_by_conf) != 0) {
491 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
492 }
493
494 /* find tmp_grp corresponding to grp */
495 stdskl_find(&tmp_GroupsList, &it, &grp);
496 if (stdskl_is_end(&tmp_GroupsList, &it)) {
497 Alarmp( SPLOG_FATAL, GROUPS, "G_signal_conf_reload: failed to find group (%s) in tmp_GroupsList\n", grp->name);
498 }
499 tmp_grp = *(group**) stdskl_it_key(&it);
500
501 stdskl_begin(&tmp_grp->DaemonsList, &bskl);
502 stdskl_end(&tmp_grp->DaemonsList, &eskl);
503 stdskl_begin(&grp->DaemonsList, &dit);
504 /* Insert entire tmp_grp->DaemonsList (from begin to last) into grp->DaemonsList */
505 stdskl_insert_seq(&grp->DaemonsList, &dit, &bskl, &eskl, STDTRUE);
506
507 /* destroy interim DaemonList since we are done with it */
508 stdskl_erase(&tmp_GroupsList, &it);
509 stdskl_destruct( &tmp_grp->DaemonsList);
510 dispose(tmp_grp);
511 }
512
513 if ( ! stdskl_empty( &tmp_GroupsList) ) {
514 Alarmp( SPLOG_FATAL, GROUPS, "G_signal_conf_reload: About to destroy temporary GroupsList but it isn't empty.\n");
515 }
516 stdskl_destruct( &tmp_GroupsList );
517
518 }
519
520
G_handle_reg_memb(configuration reg_memb,membership_id reg_memb_id)521 void G_handle_reg_memb( configuration reg_memb, membership_id reg_memb_id )
522 {
523 group *grp;
524 groups_message_link *grp_mlink;
525 message_link *mess_link;
526 synced_set sset;
527 int ret;
528 char ip_string[16];
529 bool group_changed, synced_set_changed;
530 stdit it;
531
532 IP_to_STR( reg_memb_id.proc_id, ip_string );
533 Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb: with (%s, %d) id\n",
534 ip_string, reg_memb_id.time );
535
536 switch( Gstate )
537 {
538 case GOP:
539 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_reg_memb in GOP\n");
540
541 break;
542
543 case GTRANS:
544 /*
545 * Save reg_memb and reg_memb_id
546 * if previous Trans_memb is equal to reg_memb then:
547 * (Purely subtractive membership change!!)
548 * for every changed group
549 * eliminate partitioned daemons
550 * set Grp_id to (reg_memb_id, 1)
551 * notify local members of regular membership
552 * Shift to GOP
553 * else
554 * for every changed group
555 * eliminate partitioned members
556 * Replace protocol queue, raise event thershold
557 * If I'm representative of synced set
558 * Build groups messages --
559 * contains only members local to daemons in SyncedSet
560 * Send groups messages
561 * Shift to GGATHER
562 */
563 Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb in GTRANS\n");
564
565 Reg_memb = reg_memb;
566 Reg_memb_id = reg_memb_id;
567
568 if( Conf_num_procs( &Trans_memb ) == Conf_num_procs( &Reg_memb ) )
569 {
570 for (stdskl_begin(&GroupsList, &it); !stdskl_is_end(&GroupsList, &it); )
571 {
572 grp = *(group**) stdskl_it_key(&it);
573 stdskl_it_next(&it); /* NOTE: need to do advancement before potential erasure below */
574
575 if( grp->changed )
576 {
577 /* The group has changed */
578 /* eliminating partitioned daemons */
579 G_eliminate_partitioned_daemons( grp );
580 if( grp->num_members == 0 )
581 {
582 /* discard this empty group */
583 G_remove_group( grp );
584 }else{
585 Alarmp( SPLOG_INFO, GROUPS,
586 "G_handle_reg_memb: no state transfer needed for group %s.\n",
587 grp->name );
588 Alarmp( SPLOG_DEBUG, GROUPS,
589 "G_handle_reg_memb: changing group_id for group %s\n", grp->name );
590 G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_reg_memb" );
591 grp->grp_id.memb_id = Reg_memb_id;
592 grp->grp_id.index = 1;
593 grp->changed = FALSE;
594 G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_reg_memb" );
595 G_update_daemon_memb_ids( grp );
596 if( !stdarr_empty(&grp->mboxes) )
597 {
598 G_send_heavyweight_memb( grp );
599 }
600 }
601 }
602 }
603
604 G_shift_to_GOP();
605
606 } else {
607 /*
608 * else
609 * for every changed group
610 * eliminate partitioned members
611 * Replace protocol queue, raise event thershold
612 * If I'm representative of synced set
613 * Build groups messages --
614 * contains only members local to daemons in SyncedSet
615 * Send groups messages
616 * Shift to GGATHER
617 */
618
619
620 for (stdskl_begin(&GroupsList, &it); !stdskl_is_end(&GroupsList, &it); )
621 {
622 grp = *(group**) stdskl_it_key(&it);
623 stdskl_it_next(&it); /* NOTE: need to do advancement before potential erasure below */
624
625 if( grp->changed )
626 {
627 /* The group has changed */
628 /* eliminating partitioned members */
629 G_eliminate_partitioned_daemons( grp );
630 if( grp->num_members == 0 )
631 {
632 /* discard this empty group */
633 G_remove_group( grp );
634 }
635 }
636 }
637 /* raise events threshold */
638 Session_threshold = MEDIUM_PRIORITY;
639 Sess_set_active_threshold();
640
641 /* Replace down queue */
642 Prot_set_down_queue( GROUPS_DOWNQUEUE );
643
644 /* If I'm the head of my synced set, I send one or more GROUPS messages.
645 * No daemon's data about members for a given group is ever split across
646 * multiple messages. As an optimization, only the last message is sent
647 * AGREED, and all previous messages are sent RELIABLE. G_handle_groups depends
648 * on this to determine when it has all the messages it is waiting for. */
649 if( Is_synced_set_leader(My.id) ) {
650 G_build_new_groups_bufs();
651 Groups_bufs_fresh = 1;
652 ret = G_send_groups_messages();
653 Alarmp( SPLOG_INFO, GROUPS,
654 "G_handle_reg_memb: %d GROUPS messages sent in GTRANS\n", ret );
655 }
656
657 Gstate = GGATHER;
658 GlobalStatus.gstate = Gstate;
659 }
660 break;
661
662 case GGATHER:
663 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_reg_memb in GGATHER\n");
664
665 break;
666
667 case GGT:
668 /*
669 * Save reg_memb and reg_memb_id
670 * If I received all of my synced set's group messages
671 * For all synced sets for which I have all the Groups messages
672 * Extract group information from Groups messages
673 * Add the synced set to mine
674 * Clear all retained Groups messages
675 * Check the groups state against the last delivered Trans_memb
676 * Check my synced set against Trans_memb
677 * If I'm representative of synced set
678 * If Groups bufs are still good (i.e. we didn't change anything)
679 * Stamp Groups messages with current membership id
680 * Else
681 * Remove all Groups bufs
682 * Build groups messages --
683 * contains only members local to daemons in SyncedSet
684 * Send groups messages
685 * Shift to GGATHER
686 */
687 Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb in GGT\n");
688
689 Reg_memb = reg_memb;
690 Reg_memb_id = reg_memb_id;
691
692 /* If our messages have all arrived, then we can bring into our
693 * synced set anyone who's messages have all arrived.
694 * Regardless, we need to dispose of the groups messages. */
695 for( grp_mlink = Gathered.next; grp_mlink != NULL; )
696 {
697 for( mess_link = grp_mlink->first; mess_link != NULL; )
698 {
699 if( Gathered.complete && grp_mlink->complete )
700 {
701 ret = G_mess_to_groups( mess_link, &sset );
702 if( ret < 0 )
703 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_reg_memb:"
704 " G_mess_to_groups errored %d\n", ret );
705 Groups_bufs_fresh = 0;
706 }
707 grp_mlink->first = mess_link->next;
708 Sess_dispose_message( mess_link );
709 mess_link = grp_mlink->first;
710 }
711 if( Gathered.complete && grp_mlink->complete )
712 G_add_to_synced_set( &sset );
713 Gathered.next = grp_mlink->next;
714 dispose( grp_mlink );
715 grp_mlink = Gathered.next;
716 }
717
718 /* We put off really handling the transitional configuration until now
719 * so as to not deliver potentially inconsistent groups messages
720 * if we completed the old state exchange. Now, prepare for the next one.
721 */
722 for (stdskl_begin(&GroupsList, &it); !stdskl_is_end(&GroupsList, &it); )
723 {
724 grp = *(group**) stdskl_it_key(&it);
725 stdskl_it_next(&it); /* NOTE: need to do advancement before potential erasure below */
726
727 group_changed = G_eliminate_partitioned_daemons_status( grp );
728 if( group_changed )
729 {
730 Groups_bufs_fresh = 0;
731 if( grp->num_members == 0 )
732 {
733 /* discard this empty group */
734 G_remove_group( grp );
735 } else {
736 grp->changed = TRUE;
737 }
738 }
739 }
740 synced_set_changed = G_update_synced_set_status( &MySyncedSet, &Trans_memb );
741 G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_handle_reg_memb" );
742 /* Since one of the groups bufs holds the synced set... */
743 if( synced_set_changed )
744 Groups_bufs_fresh = 0;
745
746 Gathered.complete = 0;
747 Num_mess_gathered = 0;
748 Num_daemons_gathered = 0;
749
750 if( Is_synced_set_leader(My.id) ) {
751 /* Stamp own Groups message in buffer with current membership id */
752 if( Groups_bufs_fresh ) {
753 G_stamp_groups_bufs();
754 } else {
755 G_discard_groups_bufs();
756 G_build_new_groups_bufs();
757 Groups_bufs_fresh = 1;
758 }
759 ret = G_send_groups_messages();
760 Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb: %d GROUPS messages"
761 " sent in GGT\n", ret );
762 }
763
764 Gstate = GGATHER;
765 GlobalStatus.gstate = Gstate;
766
767 break;
768 }
769 }
770
G_handle_trans_memb(configuration trans_memb,membership_id trans_memb_id)771 void G_handle_trans_memb( configuration trans_memb, membership_id trans_memb_id )
772 {
773 group *grp;
774 daemon_members *dmn;
775 bool group_changed;
776 char ip_string[16];
777 stdit git, dit;
778
779 IP_to_STR( trans_memb_id.proc_id, ip_string );
780 Alarmp( SPLOG_INFO, GROUPS, "G_handle_trans_memb: with (%s, %d) id\n",
781 ip_string, trans_memb_id.time );
782
783 switch( Gstate )
784 {
785 case GOP:
786 /*
787 * Save transitional membership
788 * For every group that has daemons that are not in the trans_memb do:
789 * mark group daemons that are not in trans_memb as partitioned.
790 * notify local members with an empty transitional group mess.
791 * mark group as changed
792 * update group ID and daemon membership IDs with Trans_memb_id
793 * Shift to GTRANS
794 */
795 Alarmp( SPLOG_INFO, GROUPS, "G_handle_trans_memb in GOP\n");
796
797 Trans_memb = trans_memb;
798 Trans_memb_id = trans_memb_id;
799
800 for (stdskl_begin(&GroupsList, &git); !stdskl_is_end(&GroupsList, &git); )
801 {
802 grp = *(group**) stdskl_it_key(&git);
803 stdskl_it_next(&git); /* NOTE: need to do advancement before potential erasure below */
804
805 group_changed = FALSE;
806
807 for (stdskl_begin(&grp->DaemonsList, &dit); !stdskl_is_end(&grp->DaemonsList, &dit); )
808 {
809 dmn = *(daemon_members**) stdskl_it_key(&dit);
810 stdskl_it_next(&dit); /* NOTE: need to do advancement before potential erasure below */
811
812 if( Conf_id_in_conf( &Trans_memb, dmn->proc_id ) == -1 )
813 {
814 /* mark this daemon as partitioned - proc no longer in membership */
815 dmn->memb_id = unknown_memb_id;
816 group_changed = TRUE;
817 }
818 }
819 if( group_changed )
820 {
821 if( !stdarr_empty(&grp->mboxes) )
822 {
823 G_send_trans_memb( grp );
824 }
825 Alarmp( SPLOG_DEBUG, GROUPS, "G_handle_trans_memb: changed group %s in GOP,"
826 " change group id \n", grp->name );
827 G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_trans_memb" );
828 grp->grp_id.memb_id = Trans_memb_id;
829 grp->grp_id.index = 1;
830 grp->changed = TRUE;
831 G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_trans_memb" );
832 /* Here, we mark everyone who we are synced with as having the new memb_id
833 * Specifically, that is, everyone who isn't partitioned from us now in the GroupsList */
834 G_update_daemon_memb_ids( grp );
835 }
836 }
837
838 Gstate = GTRANS;
839 GlobalStatus.gstate = Gstate;
840
841 G_update_synced_set( &MySyncedSet, &Trans_memb );
842 G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_handle_trans_memb" );
843
844 break;
845
846 case GTRANS:
847 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_trans_memb in GTRANS\n");
848
849 break;
850
851 case GGATHER:
852 /*
853 * Save transitional membership
854 * For every group that has members that are not in the
855 * trans_memb do:
856 * If group has daemons not in the transitional conf
857 * mark it as changed (it might be already changed, but its ok)
858 *
859 * In order to correctly handle the case in which we complete the state
860 * exchange from the *last* reg memb, don't throw out information here
861 * but rather when we get the next reg memb.
862 *
863 * Shift to GGT
864 *
865 * Note: there is no need to notify local members with a transitional group mess
866 * because no message will come between the trans group memb and the next reg group memb.
867 * Note: this cascading deletes of members that are not in transitional membership actually
868 * opens the door for implementation of the ERSADS97 algorithm.
869 */
870 Alarmp( SPLOG_INFO, GROUPS, "G_handle_trans_memb in GGATHER\n");
871
872 Trans_memb = trans_memb;
873 Trans_memb_id = trans_memb_id; /* Need these either in G_handle_groups, or in
874 * G_handle_reg_memb, depending on whether we
875 * we complete state transfer for the current Reg_memb */
876
877 /* We don't actually *need* to mark these groups as changed in the loop below.
878 * It will cause some groups that were not changed by the last membership
879 * to receive notifications anyway if we do complete the state exchange in GGT
880 * Pros: This avoids a small loss of information about who received which messages.
881 * Cons: This isn't strictly required by EVS.
882 */
883
884 for (stdskl_begin(&GroupsList, &git); !stdskl_is_end(&GroupsList, &git); )
885 {
886 grp = *(group**) stdskl_it_key(&git);
887 stdskl_it_next(&git); /* NOTE: need to do advancement before potential erasure below */
888
889 group_changed = G_check_if_changed_by_cascade( grp );
890 if( group_changed ) {
891 grp->changed = TRUE;
892 }
893 }
894
895 Gstate = GGT;
896 GlobalStatus.gstate = Gstate;
897
898 break;
899
900 case GGT:
901 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_trans_memb in GGT\n");
902
903 break;
904 }
905 }
906
G_handle_join(char * private_group_name,char * group_name)907 void G_handle_join( char *private_group_name, char *group_name )
908 {
909 group *grp, *new_grp;
910 daemon_members *dmn, *new_dmn;
911 member *mbr, *new_mbr;
912 char proc_name[MAX_PROC_NAME];
913 char private_name[MAX_PRIVATE_NAME+1];
914 int new_p_ind;
915 proc new_p;
916 int ses;
917 mailbox new_mbox;
918
919 Alarmp( SPLOG_INFO, GROUPS, "G_handle_join: %s joins group %s\n", private_group_name, group_name );
920
921 switch( Gstate )
922 {
923 case GOP:
924 case GTRANS:
925
926 if (Gstate == GOP) Alarmp( SPLOG_INFO, GROUPS, "G_handle_join in GOP\n");
927 if (Gstate == GTRANS) Alarmp( SPLOG_INFO, GROUPS, "G_handle_join in GTRANS\n");
928
929 /*
930 * Find the group being joined
931 * If group doesn't exist, then add it [unchanged, with gid index 0]
932 * If in GOP, grp_id.memb_id is Reg_memb_id
933 * If in GTRANS, grp_id.memb_id is Trans_memb_id
934 * Find the daemon
935 * If daemon [for this group] doesn't exist, then add it
936 * Record partitioned status when adding, based on Gstate and Trans_memb
937 * Find or create the member
938 * If already in group then ignore (return)
939 * Initialize member object
940 * Increment group size
941 * Increment group ID
942 * Add mbox for local joiner
943 * Mark group as changed if daemon is partitioned
944 *
945 * If the group is changed
946 * Notify all local members of a regular membership caused by network
947 * Note: this regular membership lists the joiner in a separate vs_set from
948 * its daemon's other members
949 * Notify all local members of a transitional membership
950 * Else (if the group isn't changed)
951 * Notify all local members of a regular membership caused by join
952 * Update the group mask (not needed for Spread 3)
953 */
954 G_private_to_names( private_group_name, private_name, proc_name );
955
956 new_p_ind = Conf_proc_by_name( proc_name, &new_p );
957 if( new_p_ind < 0 )
958 {
959 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_join: illegal proc_name %s in private_group %s \n",
960 proc_name, private_group_name );
961 return;
962 }
963 grp = G_get_group( group_name );
964 if( grp == NULL )
965 {
966 new_grp = new( GROUP );
967 memset( new_grp->name, 0, MAX_GROUP_NAME );
968 strcpy( new_grp->name, group_name );
969
970 if (stdskl_construct(&new_grp->DaemonsList, sizeof(daemon_members*), 0, G_compare_proc_ids_by_conf) != 0) {
971 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
972 }
973
974 if (stdarr_construct(&new_grp->mboxes, sizeof(mailbox), 0) != 0) {
975 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
976 }
977
978 /* NOTE: Older versions of groups do mark a new group as changed if it's
979 * created in GTRANS. This is only needed if the joiner is partitioned
980 * from us [handled below]. */
981 new_grp->changed = FALSE;
982 if( Gstate == GOP) {
983 new_grp->grp_id.memb_id = Reg_memb_id;
984
985 } else { /* Gtrans */
986 new_grp->grp_id.memb_id = Trans_memb_id;
987 }
988 new_grp->grp_id.index = 0; /* This will be incremented to 1, below. */
989 Alarmp( SPLOG_DEBUG, GROUPS, "G_handle_join: New group added with group id:\n" );
990 G_print_group_id( SPLOG_DEBUG, new_grp->grp_id, "G_handle_join" );
991
992 new_grp->num_members = 0;
993
994 if (stdskl_put(&GroupsList, NULL, &new_grp, NULL, STDFALSE) != 0) {
995 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
996 }
997
998 Num_groups++;
999 GlobalStatus.num_groups = Num_groups;
1000 grp = new_grp;
1001 }
1002
1003 dmn = G_get_daemon( grp, new_p.id );
1004 if( dmn == NULL ) {
1005 new_dmn = new( DAEMON_MEMBERS );
1006 new_dmn->proc_id = new_p.id;
1007
1008 if (stdskl_construct(&new_dmn->MembersList, sizeof(member*), 0, G_compare_nameptr) != 0) {
1009 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
1010 }
1011
1012 /* Are we partitioned from this daemon? */
1013 if( Gstate == GOP || ( Conf_id_in_conf( &Trans_memb, new_p.id ) != -1 ) ) {
1014 new_dmn->memb_id = grp->grp_id.memb_id;
1015 } else {
1016 new_dmn->memb_id = unknown_memb_id;
1017 }
1018
1019 if (stdskl_put(&grp->DaemonsList, NULL, &new_dmn, NULL, STDFALSE) != 0) {
1020 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
1021 }
1022
1023 dmn = new_dmn;
1024 }
1025
1026 mbr = G_get_member( dmn, private_group_name );
1027 if( mbr != NULL )
1028 {
1029 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_join: %s is already in group %s\n",
1030 private_group_name, group_name );
1031 return;
1032 }
1033 /* Add a new member */
1034 new_mbr = new( MEMBER );
1035 memset( new_mbr->name, 0, MAX_GROUP_NAME );
1036 strcpy( new_mbr->name, private_group_name );
1037
1038 if (stdskl_put(&dmn->MembersList, NULL, &new_mbr, NULL, STDFALSE) != 0) {
1039 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
1040 }
1041
1042 grp->num_members++;
1043 grp->grp_id.index++;
1044
1045 /* if member is local then add mbox */
1046 if( dmn->proc_id == My.id )
1047 {
1048 ses = Sess_get_session( private_name );
1049 if( ses < 0 ) Alarmp( SPLOG_FATAL, GROUPS, "G_handle_join: local session does not exist\n" );
1050
1051 new_mbox = Sessions[ ses ].mbox;
1052
1053 /* TODO: do we need to ensure the insert is
1054 unique or is that externally implied? Seems
1055 to be implied by above checks. */
1056
1057 if (stdarr_push_back(&grp->mboxes, &new_mbox) != 0) {
1058 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
1059 }
1060 } else
1061 new_mbox = -1;
1062
1063 if( Is_partitioned_daemon( dmn ) && !grp->changed )
1064 grp->changed = TRUE;
1065
1066 if( !stdarr_empty(&grp->mboxes) ) {
1067 if( grp->changed )
1068 {
1069 G_send_heavyweight_join( grp, new_mbr, new_mbox );
1070 G_send_trans_memb( grp );
1071 } else {
1072 G_send_lightweight_memb( grp, CAUSED_BY_JOIN, new_mbr->name );
1073 }
1074 }
1075
1076 /* Compute the mask */
1077 G_compute_group_mask( grp, "G_handle_join" );
1078
1079 break;
1080
1081 case GGATHER:
1082 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_join in GGATHER\n");
1083
1084 break;
1085
1086 case GGT:
1087 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_join in GGT\n");
1088
1089 break;
1090 }
1091 }
1092
G_handle_leave(char * private_group_name,char * group_name)1093 void G_handle_leave( char *private_group_name, char *group_name )
1094 {
1095
1096 char proc_name[MAX_PROC_NAME];
1097 char private_name[MAX_PRIVATE_NAME+1];
1098 char departing_private_group_name[MAX_GROUP_NAME];
1099 int p_ind;
1100 proc p;
1101 int ses;
1102 group *grp;
1103 daemon_members *dmn;
1104 member *mbr;
1105 stdit it;
1106
1107 Alarmp( SPLOG_INFO, GROUPS, "G_handle_leave: %s leaves group %s\n", private_group_name, group_name );
1108
1109 switch( Gstate )
1110 {
1111 case GOP:
1112 case GTRANS:
1113
1114 if (Gstate == GOP) Alarmp( SPLOG_INFO, GROUPS, "G_handle_leave in GOP\n");
1115 if (Gstate == GTRANS) Alarmp( SPLOG_INFO, GROUPS, "G_handle_leave in GTRANS\n");
1116
1117 /*
1118 * If not already in group then ignore
1119 * If this member is local, notify it [Self Leave] and extract its mbox
1120 * Extract this member from group (and discard empty daemons/groups)
1121 * Increment Grp_id
1122 * If the group is changed, then:
1123 * Notify all local members of a regular membership caused by network
1124 * with all ESTABLISHED members in the vs_set
1125 * Notify all local members of a transitional membership
1126 * If the group is unchanged (in GOP all groups are unchanged) then:
1127 * Notify all local members of a regular membership caused by leave
1128 * Update the group mask
1129 */
1130 G_private_to_names( private_group_name, private_name, proc_name );
1131 p_ind = Conf_proc_by_name( proc_name, &p );
1132 if( p_ind < 0 )
1133 {
1134 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: illegal proc_name %s in private_group %s \n",
1135 proc_name, private_group_name );
1136 return;
1137 }
1138 grp = G_get_group( group_name );
1139 if( grp == NULL )
1140 {
1141 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: group %s does not exist\n",
1142 group_name );
1143 return;
1144 }
1145 dmn = G_get_daemon( grp, p.id );
1146 if( dmn == NULL )
1147 {
1148 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: daemon %s doesn't exist in group %s\n",
1149 proc_name, group_name );
1150 return;
1151 }
1152 mbr = G_get_member( dmn, private_group_name );
1153 if( mbr == NULL )
1154 {
1155 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: member %s does not exist in daemon/group %s/%s\n",
1156 private_group_name, proc_name, group_name );
1157 return;
1158 }
1159
1160 if( p.id == My.id )
1161 {
1162 /* notify this local member and extract its mbox from group */
1163 ses = Sess_get_session( private_name );
1164
1165 if (ses < 0) {
1166 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_leave: member '%s' has no local session in group '%s'\n", private_name, grp->name );
1167 }
1168
1169 G_send_self_leave( grp, ses );
1170 G_remove_mailbox( grp, Sessions[ ses ].mbox );
1171 }
1172
1173 /* extract this member from group */
1174 memcpy( departing_private_group_name, mbr->name, MAX_GROUP_NAME );
1175
1176 if (stdskl_is_end(&dmn->MembersList, stdskl_find(&dmn->MembersList, &it, &mbr))) {
1177 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_leave: couldn't extract member(%s) from MembersList!\n", mbr->name );
1178 }
1179
1180 stdskl_erase(&dmn->MembersList, &it);
1181
1182 dispose(mbr);
1183 grp->num_members--;
1184 if( stdskl_empty(&dmn->MembersList) )
1185 {
1186 G_remove_daemon( grp, dmn );
1187 }
1188 if( grp->num_members == 0 )
1189 {
1190 /* discard this empty group */
1191 G_remove_group( grp );
1192 return;
1193 }
1194
1195 /* Increment group id */
1196 grp->grp_id.index++;
1197
1198 /* Note: Groups become changed because they include partitioned daemons.
1199 * We never need to mark a group as changed here, or in G_handle_kill.
1200 */
1201
1202 if( !stdarr_empty(&grp->mboxes) )
1203 {
1204 if( grp->changed )
1205 {
1206 G_send_heavyweight_memb( grp );
1207 G_send_trans_memb( grp );
1208 } else {
1209 G_send_lightweight_memb( grp, CAUSED_BY_LEAVE, departing_private_group_name );
1210 }
1211 }
1212
1213 /* Compute the mask */
1214 G_compute_group_mask( grp, "G_handle_leave" );
1215
1216 break;
1217
1218 case GGATHER:
1219 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_leave in GGATHER\n");
1220
1221 break;
1222
1223 case GGT:
1224 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_leave in GGT\n");
1225
1226 break;
1227 }
1228 }
1229
G_handle_kill(char * private_group_name)1230 void G_handle_kill( char *private_group_name )
1231 {
1232 char proc_name[MAX_PROC_NAME];
1233 char private_name[MAX_PRIVATE_NAME+1];
1234 char departing_private_group_name[MAX_GROUP_NAME];
1235 int p_ind;
1236 proc p;
1237 group *grp;
1238 daemon_members *dmn;
1239 member *mbr;
1240 int ses = -1; /* Fool compiler */
1241 stdit it, tit;
1242
1243 Alarmp( SPLOG_INFO, GROUPS, "G_handle_kill: %s is killed\n", private_group_name );
1244
1245 switch( Gstate )
1246 {
1247 case GOP:
1248 case GTRANS:
1249
1250 if (Gstate == GOP) Alarmp( SPLOG_INFO, GROUPS, "G_handle_kill in GOP\n");
1251 if (Gstate == GTRANS) Alarmp( SPLOG_INFO, GROUPS, "G_handle_kill in GTRANS\n");
1252
1253 /*
1254 * For every group this guy is a member of
1255 * Extract this member from group (and discard empty daemons/groups)
1256 * Increment Grp_id
1257 * If the group is changed, then:
1258 * Notify all local members of a regular membership caused by network
1259 * with all ESTABLISHED members in the vs_set
1260 * Notify all local members of a transitional membership
1261 * If the group is unchanged (in GOP all groups are unchanged) then:
1262 * Notify all local members of a regular membership caused by disconnect
1263 * Update the group mask
1264 */
1265 G_private_to_names( private_group_name, private_name, proc_name );
1266 p_ind = Conf_proc_by_name( proc_name, &p );
1267 if( p_ind < 0 )
1268 {
1269 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_kill: illegal proc_name %s in private_group %s \n",
1270 proc_name, private_group_name );
1271 return;
1272 }
1273
1274 if( p.id == My.id ) ses = Sess_get_session( private_name ); /* FIXME: check for negative answer and error? */
1275
1276 for (stdskl_begin(&GroupsList, &it); !stdskl_is_end(&GroupsList, &it); )
1277 {
1278 grp = *(group**) stdskl_it_key(&it);
1279 stdskl_it_next(&it); /* NOTE: need to do advancement before potential erasure below */
1280
1281 dmn = G_get_daemon( grp, p.id );
1282 if( dmn == NULL ) continue; /* member's daemon not in group */
1283 mbr = G_get_member( dmn, private_group_name );
1284 if( mbr == NULL ) continue; /* no such member in that group */
1285
1286 /* Extract this member from group */
1287 if( p.id == My.id )
1288 {
1289 G_remove_mailbox( grp, Sessions[ ses ].mbox );
1290 }
1291 memcpy( departing_private_group_name, mbr->name, MAX_GROUP_NAME );
1292
1293 if (stdskl_is_end(&dmn->MembersList, stdskl_find(&dmn->MembersList, &tit, &mbr))) {
1294 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_kill: unable to extract member(%s) from MembersList!\n", private_group_name );
1295 }
1296
1297 stdskl_erase(&dmn->MembersList, &tit);
1298
1299 dispose(mbr);
1300 grp->num_members--;
1301 if( stdskl_empty(&dmn->MembersList) )
1302 {
1303 G_remove_daemon( grp, dmn );
1304 }
1305 if( grp->num_members == 0 )
1306 {
1307 /* discard this empty group */
1308 G_remove_group( grp );
1309 continue;
1310 }
1311
1312 /* Increment group id */
1313 grp->grp_id.index++;
1314
1315 if( !stdarr_empty(&grp->mboxes) )
1316 {
1317 if( grp->changed )
1318 {
1319 G_send_heavyweight_memb( grp );
1320 G_send_trans_memb( grp );
1321 } else {
1322 G_send_lightweight_memb( grp, CAUSED_BY_DISCONNECT,
1323 departing_private_group_name );
1324 }
1325 }
1326
1327 /* Compute the mask */
1328 G_compute_group_mask( grp, "G_handle_kill" );
1329 }
1330 break;
1331
1332 case GGATHER:
1333 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_kill in GGATHER\n");
1334
1335 break;
1336
1337 case GGT:
1338 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_kill in GGT\n");
1339
1340 break;
1341 }
1342 }
1343
G_send_lightweight_memb(group * grp,int32 caused,char * private_group_name)1344 static void G_send_lightweight_memb( group *grp, int32 caused, char *private_group_name )
1345 {
1346 int needed;
1347 int num_bytes;
1348 int ses;
1349 message_link *mess_link;
1350 message_header *head_ptr;
1351 message_obj *msg;
1352 int32u temp;
1353 char *num_vs_sets_ptr; /* number of vs_sets */
1354 char *vs_set_offset_ptr; /* Byte offset into the vs_set region of my vs_set */
1355 char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
1356 char *vs_ptr; /* the virtual synchrony set */
1357 mailbox *mbox_ptr;
1358 mailbox *endbox_ptr;
1359
1360 msg = Message_new_message();
1361 num_bytes = G_build_memb_buf( grp, msg, Mess_buf, caused );
1362
1363 num_vs_sets_ptr = &Mess_buf[num_bytes];
1364 num_bytes += sizeof( int32u );
1365 temp = 1;
1366 memcpy( num_vs_sets_ptr, &temp, sizeof(int32u) ); /* 1 vs_set */
1367
1368 vs_set_offset_ptr = &Mess_buf[num_bytes];
1369 num_bytes += sizeof( int32u );
1370 temp = 0;
1371 memcpy( vs_set_offset_ptr, &temp, sizeof(int32u) ); /* offset is zero, always */
1372
1373 num_vs_ptr = &Mess_buf[ num_bytes ];
1374 num_bytes += sizeof( int32u );
1375 temp = 1;
1376 memcpy( num_vs_ptr, &temp, sizeof( int32u ) ); /* with 1 member */
1377
1378 vs_ptr = (char *)&Mess_buf[ num_bytes ]; /* vs_set has joiner/leaver/disconnecter */
1379 memcpy( vs_ptr, private_group_name, MAX_GROUP_NAME );
1380 num_bytes += MAX_GROUP_NAME;
1381
1382 head_ptr = Message_get_message_header(msg);
1383 head_ptr->data_len += ( 3*sizeof(int32) + MAX_GROUP_NAME );
1384
1385 mess_link = new( MESSAGE_LINK );
1386 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
1387 mess_link->mess = msg;
1388 Obj_Inc_Refcount(mess_link->mess);
1389
1390 needed = 0;
1391
1392 mbox_ptr = (mailbox*) grp->mboxes.begin;
1393 endbox_ptr = mbox_ptr + grp->mboxes.size;
1394
1395 for (; mbox_ptr != endbox_ptr; ++mbox_ptr)
1396 {
1397 ses = Sess_get_session_index ( *mbox_ptr );
1398
1399 if( Is_memb_session( Sessions[ ses ].status ) )
1400 Sess_write( ses, mess_link, &needed );
1401 }
1402 if ( !needed ) Sess_dispose_message( mess_link );
1403 Message_Dec_Refcount(msg);
1404 }
1405
G_send_self_leave(group * grp,int ses)1406 static void G_send_self_leave( group *grp, int ses )
1407 {
1408 message_link *mess_link;
1409 message_header *head_ptr;
1410 message_obj *msg;
1411 int needed;
1412
1413 msg = Message_new_message();
1414 head_ptr = Message_get_message_header(msg);
1415 head_ptr->type = CAUSED_BY_LEAVE;
1416 head_ptr->type = Set_endian( head_ptr->type );
1417 head_ptr->hint = Set_endian( 0 );
1418 memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
1419 head_ptr->num_groups = 0;
1420 head_ptr->data_len = 0;
1421
1422 /* create the mess_link */
1423 mess_link = new( MESSAGE_LINK );
1424 /* NOTE: Mess_buf contents are NOT used here. We only examine "0" bytes of it
1425 * We just need a valid pointer here to prevent faults */
1426 Message_Buffer_to_Message_Fragments( msg, Mess_buf, 0);
1427 mess_link->mess = msg;
1428 Obj_Inc_Refcount(mess_link->mess);
1429 /* notify member */
1430 needed = 0;
1431 if( Is_memb_session( Sessions[ ses ].status ) )
1432 Sess_write( ses, mess_link, &needed );
1433 if( !needed ) Sess_dispose_message( mess_link );
1434 Message_Dec_Refcount(msg);
1435 }
1436
G_send_heavyweight_memb(group * grp)1437 static void G_send_heavyweight_memb( group *grp )
1438 {
1439 G_send_heavyweight_join( grp, NULL, -1 );
1440 }
1441
1442 /* If there is a new member, then joiner will be non-null.
1443 * new_mbox is -1 unless there is a new member, and it's local. */
G_send_heavyweight_join(group * grp,member * joiner,mailbox new_mbox)1444 static void G_send_heavyweight_join( group *grp, member *joiner, mailbox new_mbox )
1445 {
1446 int num_bytes;
1447 message_link *mess_link, *joiner_mess_link;
1448 message_header *head_ptr;
1449 message_obj *msg, *joiner_msg;
1450 int32u temp;
1451 char *local_vs_set_offset_ptr;
1452 int needed, joiner_needed;
1453 int ses;
1454 mailbox *mbox_ptr;
1455 mailbox *endbox_ptr;
1456
1457 msg = Message_new_message();
1458 num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK, joiner );
1459
1460 /* create the mess_link */
1461 mess_link = new( MESSAGE_LINK );
1462 Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
1463 mess_link->mess = msg;
1464 Obj_Inc_Refcount(mess_link->mess);
1465
1466 /* notify local members */
1467 needed = 0;
1468
1469 mbox_ptr = (mailbox*) grp->mboxes.begin;
1470 endbox_ptr = mbox_ptr + grp->mboxes.size;
1471
1472 for (; mbox_ptr != endbox_ptr; ++mbox_ptr)
1473 {
1474 /* if new member is local we do not notify it here. */
1475 if( joiner != NULL && new_mbox == *mbox_ptr) continue;
1476
1477 ses = Sess_get_session_index ( *mbox_ptr );
1478 if( Is_memb_session( Sessions[ ses ].status ) )
1479 Sess_write( ses, mess_link, &needed );
1480 }
1481
1482 /* notify new member if local */
1483 if( new_mbox != -1 )
1484 {
1485 /* Use (mostly) the same message as was sent to the other local members. */
1486 joiner_msg = Message_new_message();
1487 head_ptr = Message_get_message_header(joiner_msg);
1488 memcpy( head_ptr, Message_get_message_header(msg), sizeof(message_header) );
1489
1490 /* Change the local vs_set offset to be the right one for the joiner. */
1491 local_vs_set_offset_ptr = &Mess_buf[
1492 head_ptr->num_groups * MAX_GROUP_NAME + sizeof(group_id) + sizeof(int32u)];
1493 /* Offset starts from the first vs_set's size, and goes to the size of the last. */
1494 temp = head_ptr->data_len
1495 - (sizeof(int32u) + MAX_GROUP_NAME)
1496 - (sizeof(group_id) + 2*sizeof(int32u));
1497 memcpy( local_vs_set_offset_ptr, &temp, sizeof(int32u) );
1498
1499 joiner_mess_link = new( MESSAGE_LINK );
1500 Message_Buffer_to_Message_Fragments( joiner_msg, Mess_buf, num_bytes );
1501 joiner_mess_link->mess = joiner_msg;
1502 Obj_Inc_Refcount(joiner_mess_link->mess);
1503
1504 joiner_needed = 0;
1505 ses = Sess_get_session_index ( new_mbox );
1506 if( Is_memb_session( Sessions[ ses ].status ) )
1507 Sess_write( ses, joiner_mess_link, &joiner_needed );
1508 if ( !joiner_needed ) Sess_dispose_message( joiner_mess_link );
1509 Message_Dec_Refcount(joiner_msg);
1510 }
1511 if( !needed ) Sess_dispose_message( mess_link );
1512 Message_Dec_Refcount(msg);
1513 }
1514
G_send_trans_memb(group * grp)1515 static void G_send_trans_memb( group *grp )
1516 {
1517 message_link *mess_link;
1518 int needed;
1519 int ses;
1520 mailbox *mbox_ptr;
1521 mailbox *endbox_ptr;
1522
1523 /* send members transitional membership */
1524 mess_link = G_build_trans_mess( grp );
1525 needed = 0;
1526
1527 mbox_ptr = (mailbox*) grp->mboxes.begin;
1528 endbox_ptr = mbox_ptr + grp->mboxes.size;
1529
1530 for (; mbox_ptr != endbox_ptr; ++mbox_ptr)
1531 {
1532 ses = Sess_get_session_index ( *mbox_ptr );
1533 if( Is_memb_session( Sessions[ ses ].status ) )
1534 Sess_write( ses, mess_link, &needed );
1535 }
1536 if( !needed ) Sess_dispose_message( mess_link );
1537 }
1538
G_handle_groups(message_link * mess_link)1539 void G_handle_groups( message_link *mess_link )
1540 {
1541 char *memb_id_ptr;
1542 membership_id temp_memb_id;
1543 message_obj *msg;
1544 message_header *head_ptr;
1545 proc p;
1546 int32u num_daemons_represented;
1547 int needed;
1548 groups_message_link *grp_mlink = NULL;
1549
1550 Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: \n" );
1551
1552 switch( Gstate )
1553 {
1554 case GOP:
1555
1556 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_groups in GOP\n");
1557
1558 break;
1559
1560 case GTRANS:
1561
1562 Alarmp( SPLOG_FATAL, GROUPS, "G_handle_groups in GTRANS\n");
1563
1564 break;
1565
1566 case GGATHER:
1567 case GGT:
1568
1569 if (Gstate == GGATHER) Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups in GGATHER\n");
1570 if (Gstate == GGT) Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups in GGT\n");
1571
1572 msg = mess_link->mess;
1573 Obj_Inc_Refcount(msg);
1574 head_ptr = Message_get_message_header(msg);
1575 memb_id_ptr = Message_get_first_data_ptr(msg);
1576 memcpy( &temp_memb_id, memb_id_ptr, sizeof( membership_id ) );
1577 if( !Same_endian( head_ptr->type ) )
1578 {
1579 /* Flip membership id */
1580 temp_memb_id.proc_id = Flip_int32( temp_memb_id.proc_id );
1581 temp_memb_id.time = Flip_int32( temp_memb_id.time );
1582 }
1583 if( ! Memb_is_equal( temp_memb_id, Reg_memb_id ) )
1584 {
1585 Alarmp( SPLOG_INFO, GROUPS,
1586 "G_handle_groups: GROUPS message received from bad memb id proc %d, time %d, daemon %s.\n",
1587 temp_memb_id.proc_id, temp_memb_id.time, head_ptr->private_group_name );
1588 Sess_dispose_message( mess_link );
1589 Message_Dec_Refcount(msg);
1590 return;
1591 }
1592 if (0 > Conf_proc_by_name( head_ptr->private_group_name , &p ) )
1593 {
1594 Alarmp( SPLOG_ERROR, GROUPS, "G_handle_groups: Groups message from someone (%s) not in conf\n",
1595 head_ptr->private_group_name);
1596 Sess_dispose_message( mess_link );
1597 Message_Dec_Refcount(msg);
1598 return;
1599 }
1600
1601 /* This is a message from my rep -- don't process it. */
1602 if( Is_synced_set_leader(p.id) )
1603 {
1604 grp_mlink = &Gathered;
1605 needed = 0;
1606 /* else, find the appropriate rep's message list. */
1607 } else {
1608 needed = 1;
1609 for( grp_mlink = Gathered.next; grp_mlink != NULL; grp_mlink = grp_mlink->next )
1610 if( p.id == grp_mlink->rep_proc_id )
1611 break;
1612 if( grp_mlink == NULL )
1613 {
1614 grp_mlink = new( GROUPS_MESSAGE_LINK );
1615 grp_mlink->rep_proc_id = p.id;
1616 grp_mlink->complete = 0;
1617 mess_link->next = NULL;
1618 grp_mlink->first = mess_link;
1619 grp_mlink->next = Gathered.next;
1620 Gathered.next = grp_mlink;
1621 } else {
1622 mess_link->next = grp_mlink->first;
1623 grp_mlink->first = mess_link;
1624 }
1625 }
1626
1627 Num_mess_gathered++;
1628 /* The last Groups message a daemon sends is AGREED. */
1629 if( Is_agreed_mess( head_ptr->type ) )
1630 {
1631 /* The last 4 bytes of the private group name field are overridden to hold
1632 * the size of the synced set of this daemon. This way, G_handle_groups
1633 * doesn't have to find the groups message containing the synced set. */
1634 memcpy( &num_daemons_represented,
1635 &(head_ptr->private_group_name[MAX_GROUP_NAME-sizeof(int32u)]),
1636 sizeof(int32u) );
1637 if( !Same_endian( head_ptr->type ) )
1638 num_daemons_represented = Flip_int32( num_daemons_represented );
1639 Num_daemons_gathered += num_daemons_represented;
1640 grp_mlink->complete = 1;
1641 }
1642
1643 Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n",
1644 head_ptr->private_group_name, Num_mess_gathered, Num_daemons_gathered );
1645
1646 /* At this point, we no longer need to work with the message object in this function. */
1647 if( !needed )
1648 Sess_dispose_message( mess_link );
1649 Message_Dec_Refcount(msg);
1650
1651
1652 if( Num_daemons_gathered != Conf_num_procs( &Reg_memb ) )
1653 {
1654 return;
1655 }
1656 Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
1657 Num_mess_gathered, Num_daemons_gathered );
1658 /* Replace protocol queue */
1659 Prot_set_down_queue( NORMAL_DOWNQUEUE );
1660
1661 /* lower events threshold */
1662 Session_threshold = LOW_PRIORITY;
1663 Sess_set_active_threshold();
1664
1665 /*
1666 * Compute new groups membership and notify members of
1667 * groups that have changed
1668 */
1669 G_compute_and_notify();
1670
1671 if( Gstate == GGATHER )
1672 {
1673 G_shift_to_GOP();
1674 }else{
1675 G_shift_to_GOP();
1676 /* We do want to deliver a transitional signal to any
1677 * groups that are going to get a CAUSED_BY_NETWORK
1678 * after our Reg_memb is delivered. */
1679 G_handle_trans_memb( Trans_memb, Trans_memb_id );
1680 }
1681
1682 break;
1683 }
1684 }
1685
G_compute_and_notify()1686 static void G_compute_and_notify()
1687 {
1688 group *grp;
1689 int ret;
1690 groups_message_link *grp_mlink;
1691 message_link *mess_link;
1692 synced_set sset;
1693 stdit it;
1694
1695 Alarmp( SPLOG_INFO, GROUPS, "G_compute_and_notify:\n");
1696 /* Add contents of groups messages from other synced sets to my GroupsList,
1697 * from gathered messages. Then discard messages */
1698
1699 for( grp_mlink = Gathered.next; grp_mlink != NULL; )
1700 {
1701 for( mess_link = grp_mlink->first; mess_link != NULL; )
1702 {
1703 ret = G_mess_to_groups( mess_link, &sset );
1704 if( ret < 0 )
1705 Alarmp( SPLOG_FATAL, GROUPS, "G_compute_and_notify:"
1706 " G_mess_to_groups errored %d\n", ret );
1707 grp_mlink->first = mess_link->next;
1708 Sess_dispose_message( mess_link );
1709 mess_link = grp_mlink->first;
1710 }
1711 G_add_to_synced_set( &sset );
1712 Gathered.next = grp_mlink->next;
1713 dispose( grp_mlink );
1714 grp_mlink = Gathered.next;
1715 }
1716 G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_compute_and_notify" );
1717
1718 /* At this point, our GroupsList is complete, as is our synced_set. */
1719
1720 for (stdskl_begin(&GroupsList, &it); !stdskl_is_end(&GroupsList, &it); )
1721 {
1722 grp = *(group**) stdskl_it_key(&it);
1723 stdskl_it_next(&it); /* NOTE: need to do advancement before potential erasure below */
1724
1725 /*
1726 * for every group:
1727 * If the group has changed (*)
1728 * Set new gid
1729 * notify all local members (who came with whom)
1730 *
1731 * Note: the group is changed if any of the following is true:
1732 * (1) It had partitioned members during the transitional period
1733 * [If it lost members to transitional, or gained some
1734 * because of a join from a partitioned daemon.]
1735 * (2) Daemons marked with different memb_ids were included
1736 * [We found new daemons with info for this group,
1737 * according to G_mess_to_groups.]
1738 * (3) If we got a cascading transitional that affects this group.
1739 * Note: I'm not sure that this behavior is necessary, but it's
1740 * consistent with all versions of Spread that I know.
1741 * See GGATHER case of G_handle_trans_memb for more info.
1742 */
1743
1744 if( !grp->changed )
1745 continue;
1746 /* the group has changed */
1747 grp->grp_id.memb_id = Reg_memb_id;
1748 grp->grp_id.index = 1;
1749 grp->changed = FALSE;
1750 if( !stdarr_empty(&grp->mboxes) )
1751 G_send_heavyweight_memb( grp );
1752 G_update_daemon_memb_ids( grp );
1753 }
1754 Gathered.complete = 0;
1755 Num_mess_gathered = 0;
1756 Num_daemons_gathered = 0;
1757
1758 /* We're going back to GOP... destroy our groups messages. */
1759 G_discard_groups_bufs();
1760 Groups_bufs_fresh = 0;
1761
1762 G_print();
1763 }
1764
1765 /* Commented out -- not currently needed
1766 *static int G_id_is_equal( group_id g1, group_id g2 )
1767 *{
1768 * if( g1.index == g2.index && Memb_is_equal( g1.memb_id, g2.memb_id ) )
1769 * return( 1 );
1770 * else return( 0 );
1771 *}
1772 */
1773
G_get_group(char * group_name)1774 static group *G_get_group( char *group_name )
1775 {
1776 stdit it;
1777
1778 stdskl_find(&GroupsList, &it, &group_name);
1779
1780 return (!stdskl_is_end(&GroupsList, &it) ? *(group**) stdskl_it_key(&it) : NULL);
1781 }
1782
G_get_daemon(group * grp,int32u proc_id)1783 static daemon_members *G_get_daemon( group *grp, int32u proc_id )
1784 {
1785 stdit it;
1786 int32u * proc_id_ptr = &proc_id;
1787
1788 stdskl_find(&grp->DaemonsList, &it, &proc_id_ptr);
1789
1790 return (!stdskl_is_end(&grp->DaemonsList, &it) ? *(daemon_members**) stdskl_it_key(&it) : NULL);
1791 }
1792
G_get_member(daemon_members * dmn,char * private_group_name)1793 static member *G_get_member( daemon_members *dmn, char *private_group_name )
1794 {
1795 stdit it;
1796
1797 stdskl_find(&dmn->MembersList, &it, &private_group_name);
1798
1799 return (!stdskl_is_end(&dmn->MembersList, &it) ? *(member**) stdskl_it_key(&it) : NULL);
1800 }
1801
G_build_trans_mess(group * grp)1802 static message_link *G_build_trans_mess( group *grp )
1803 {
1804 /*
1805 * This routine builds a ready-to-be-sent transitional message signal
1806 * to the members of the process group grp
1807 */
1808
1809 /* FIXME: the documentation says the gid field isn't there. Should
1810 * it be removed? */
1811
1812 message_link *mess_link;
1813 scatter *scat;
1814 message_header *head_ptr;
1815 char *gid_ptr;
1816
1817 mess_link = new( MESSAGE_LINK );
1818 mess_link->mess = Message_create_message(TRANSITION_MESS, grp->name);
1819
1820 scat = Message_get_data_scatter(mess_link->mess);
1821 scat->elements[0].len = Message_get_data_header_size() +
1822 sizeof( group_id );
1823 head_ptr = Message_get_message_header(mess_link->mess);
1824 gid_ptr = Message_get_first_data_ptr(mess_link->mess );
1825
1826 head_ptr->data_len = sizeof( group_id );
1827 memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
1828
1829 return( mess_link );
1830 }
1831
1832 /* The buffer built needs to be deterministic and ordered according first
1833 * to daemon order in conf, second by member name. */
G_build_memb_buf(group * grp,message_obj * msg,char buf[],int32 caused)1834 static int G_build_memb_buf( group *grp, message_obj *msg, char buf[], int32 caused )
1835 {
1836 int num_bytes;
1837 message_header *head_ptr;
1838 char *gid_ptr;
1839 member *mbr;
1840 daemon_members *dmn;
1841 char *memb_ptr;
1842 stdit it, mit;
1843
1844 head_ptr = Message_get_message_header(msg);
1845 head_ptr->type = REG_MEMB_MESS;
1846 head_ptr->type = Set_endian( head_ptr->type );
1847 head_ptr->type |= caused;
1848 head_ptr->hint = Set_endian( 0 );
1849 memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
1850 head_ptr->num_groups = grp->num_members;
1851 head_ptr->data_len = sizeof( group_id );
1852
1853 num_bytes = 0;
1854 for (stdskl_begin(&grp->DaemonsList, &it); !stdskl_is_end(&grp->DaemonsList, &it); )
1855 {
1856 dmn = *(daemon_members**) stdskl_it_key(&it);
1857 stdskl_it_next(&it); /* NOTE: need to do advancement before potential erasure below */
1858
1859 for (stdskl_begin(&dmn->MembersList, &mit); !stdskl_is_end(&dmn->MembersList, &mit); )
1860 {
1861 mbr = *(member**) stdskl_it_key(&mit);
1862 stdskl_it_next(&mit); /* NOTE: need to do advancement before potential erasure below */
1863
1864 memb_ptr = &buf[num_bytes];
1865 num_bytes += MAX_GROUP_NAME;
1866 memcpy( memb_ptr, mbr->name, MAX_GROUP_NAME );
1867 }
1868 }
1869 gid_ptr = &buf[num_bytes];
1870 num_bytes += sizeof( group_id );
1871 memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
1872
1873 return( num_bytes );
1874 }
1875
1876
G_build_memb_vs_buf(group * grp,message_obj * msg,char buf[],int32 caused,member * joiner)1877 static int G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused, member *joiner )
1878 {
1879 /*
1880 * This routine builds the memb buffer message, including a virtual synchrony
1881 * (failure atomicity) part with a set of vs_sets (ordered deterministically by the
1882 * daemon membership IDs). Each vs_set specifies a set of members (ordered deterministically
1883 * by daemon order in conf, then by private group name) with the property that the members
1884 * listed are either virtually syncrhonous with each other, or crashed.
1885 *
1886 * Partitioned daemons get singleton sets, as do new joiners in the case of
1887 * a join delivered during a transitional period for a changed group. That is, we provide
1888 * all the information we have, which is that the members at a given daemon are together.
1889 *
1890 * Note that in (non-GTRANS/changed) join, leave, and disconnect we provide the member
1891 * that joined, left, or got disconnected in the vs_set. Therefore, caused will always be
1892 * CAUSED_BY_NETWORK.
1893 * The joiner should be NULL, except in the case of a join during GTRANS for a group
1894 * that has some partitioned daemons.
1895 */
1896
1897 /* The buffer constructed should have two regions, as exposed to the user:
1898 * groups array (ordered by daemon order in conf, then by member name)
1899 * data
1900 *
1901 * The data portion should look like the following:
1902 * group id (group_id)
1903 * number of vs sets (int32u)
1904 * offset to the vs set for the member this is sent to (This is a byte offset into
1905 * the vs_set region. Could do just vs_set number, but that would be slower in the
1906 * [assumed to be] common case that people just want their set.) (int32u)
1907 * vs sets (ordered by group id, with partitioned daemons singleton, and joiner last)
1908 *
1909 * Each vs set looks like:
1910 * number of members (int32u)
1911 * members (ordered by daemon order in conf, then by member name) (array of group names)
1912 */
1913
1914 int num_bytes;
1915 message_header *head_ptr;
1916 char *vs_set_region_ptr; /* int32u */
1917 char *num_vs_sets_ptr; /* int32u */
1918 int32u num_vs_sets;
1919 char *local_vs_set_offset_ptr; /* int32u */
1920 int32u local_vs_set_offset;
1921
1922 char *curr_vs_set_size_ptr; /* int32u */
1923 int32u curr_vs_set_size;
1924 membership_id curr_vs_set_memb_id;
1925
1926 daemon_members *dmn;
1927 member *mbr;
1928 char *membs_ptr;
1929 stdskl temp;
1930 int needed;
1931 int found_joiner = 0;
1932 stdit it, mit;
1933
1934 num_bytes = G_build_memb_buf( grp, msg, buf, caused );
1935 head_ptr = Message_get_message_header(msg);
1936
1937 num_vs_sets_ptr = &buf[num_bytes];
1938 num_bytes += sizeof( int32u );
1939 head_ptr->data_len += sizeof( int32u );
1940 num_vs_sets = 0;
1941
1942 local_vs_set_offset_ptr = &buf[num_bytes];
1943 num_bytes += sizeof( int32u );
1944 head_ptr->data_len += sizeof( int32u );
1945 /* This function is only called if I have local members. So, if the offset
1946 * isn't found, then the joiner is my member. */
1947 local_vs_set_offset = 0;
1948
1949 /* Points to the front of the vs_sets */
1950 vs_set_region_ptr = &buf[num_bytes];
1951
1952 /* use a skiplist to sort all of the group's daemons by memb_id (primary) and then by proc_id (secondary) */
1953
1954 if (stdskl_construct(&temp, sizeof(daemon_members*), 0, G_compare_daemon_vs_set) != 0) {
1955 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
1956 }
1957
1958 if (stdskl_put_seq_n(&temp, NULL, stdskl_begin(&grp->DaemonsList, &it), stdskl_size(&grp->DaemonsList), STDFALSE) != 0) {
1959 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
1960 }
1961
1962 curr_vs_set_memb_id = unknown_memb_id;
1963 curr_vs_set_size_ptr = NULL;
1964
1965 for (stdskl_begin(&temp, &it); !stdskl_is_end(&temp, &it); stdskl_it_next(&it))
1966 {
1967 dmn = *(daemon_members**) stdskl_it_key(&it);
1968 needed = 0;
1969 if( Is_unknown_memb_id(&curr_vs_set_memb_id) ||
1970 !Memb_is_equal( curr_vs_set_memb_id, dmn->memb_id ) )
1971 needed = 1;
1972 if( needed ) {
1973 num_vs_sets++;
1974 curr_vs_set_memb_id = dmn->memb_id;
1975 curr_vs_set_size_ptr = &buf[num_bytes];
1976 num_bytes += sizeof(int32u);
1977 head_ptr->data_len += sizeof(int32u);
1978 curr_vs_set_size = 0;
1979 }
1980 if( dmn->proc_id == My.id ) {
1981 if( local_vs_set_offset != 0 )
1982 Alarmp( SPLOG_FATAL, GROUPS, "G_build_memb_vs_buf: Found my vs set twice for group %s\n",
1983 grp->name );
1984 local_vs_set_offset = curr_vs_set_size_ptr - vs_set_region_ptr;
1985 memcpy( local_vs_set_offset_ptr, &local_vs_set_offset, sizeof(int32u) );
1986 }
1987
1988 for (stdskl_begin(&dmn->MembersList, &mit); !stdskl_is_end(&dmn->MembersList, &mit); stdskl_it_next(&mit))
1989 {
1990 mbr = *(member**) stdskl_it_key(&mit);
1991
1992 /* Handle changed-group join during transitional. The joiner does not
1993 * get to be listed with everyone else from his daemon, but rather at
1994 * the end, in a self vs set. */
1995 if( joiner != NULL && !found_joiner ) {
1996 if( strcmp( joiner->name, mbr->name ) == 0 )
1997 {
1998 found_joiner = 1;
1999 continue;
2000 }
2001 }
2002 membs_ptr = &buf[num_bytes];
2003 num_bytes += MAX_GROUP_NAME;
2004 head_ptr->data_len += MAX_GROUP_NAME;
2005 memcpy( membs_ptr, mbr->name, MAX_GROUP_NAME );
2006 curr_vs_set_size++;
2007 }
2008 /* Every time we finish one daemon, update the size of the current vs_set */
2009 memcpy( curr_vs_set_size_ptr, &curr_vs_set_size, sizeof(int32u) );
2010 }
2011 if( joiner != NULL ) {
2012 if( !found_joiner )
2013 Alarmp( SPLOG_FATAL, GROUPS, "G_build_memb_vs_buf: Expected to find joining member %s.\n",
2014 joiner->name );
2015 num_vs_sets++;
2016 curr_vs_set_size_ptr = &buf[num_bytes];
2017 num_bytes += sizeof(int32u);
2018 head_ptr->data_len += sizeof(int32u);
2019 curr_vs_set_size = 1;
2020 memcpy( curr_vs_set_size_ptr, &curr_vs_set_size, sizeof(int32u) );
2021 membs_ptr = &buf[num_bytes];
2022 memcpy( membs_ptr, joiner->name, MAX_GROUP_NAME );
2023 num_bytes += MAX_GROUP_NAME;
2024 head_ptr->data_len += MAX_GROUP_NAME;
2025 }
2026 /* Make sure we don't leak memory before the stack gets freed and takes
2027 * the skiplist with it. We don't actually want to free the daemons. */
2028 stdskl_destruct(&temp);
2029 memcpy( num_vs_sets_ptr, &num_vs_sets, sizeof(int32u) );
2030
2031 return( num_bytes );
2032 }
2033
G_build_groups_msg_hdr(message_obj * msg,int groups_bytes)2034 static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes )
2035 {
2036 message_header *head_ptr;
2037
2038 head_ptr = Message_get_message_header(msg);
2039 head_ptr->type = GROUPS_MESS;
2040 head_ptr->type = Set_endian( head_ptr->type );
2041 head_ptr->hint = Set_endian( 0 );
2042 memset(head_ptr->private_group_name, 0, MAX_GROUP_NAME);
2043 /* Note: this copy uses at most 20 bytes (including terminating NULL),
2044 * because proc.name is limited to MAX_PROC_NAME. */
2045 strcpy( head_ptr->private_group_name, My.name );
2046 /* The last 4 bytes of the private group name field are overridden to hold
2047 * the size of the synced set of this daemon. This way, G_handle_groups
2048 * doesn't have to mess around with the data region of the groups messages. */
2049 memcpy( &head_ptr->private_group_name[MAX_GROUP_NAME-sizeof(int32u)],
2050 &(MySyncedSet.size), sizeof(int32u) );
2051 head_ptr->num_groups = 0;
2052 head_ptr->data_len = groups_bytes;
2053 }
2054
2055 /* This function guarantees that each daemon's data about a given group appears in only one buffer in
2056 * a sequence, and that the sorted order is preserved from the GroupsList. */
G_build_groups_buf(char buf[],stdit * git,stdit * dit,int first_time)2057 static int G_build_groups_buf( char buf[], stdit *git, stdit *dit, int first_time)
2058 {
2059 int num_bytes;
2060
2061 char *memb_id_ptr;
2062 char *flag_ptr; /* char, only need 1 bit, really */
2063 char *synced_set_size_ptr; /* int32u */
2064 char *synced_set_procs_ptr; /* int32 */
2065
2066 group *grp;
2067 daemon_members *dmn;
2068 char *proc_id_ptr; /* int32 */
2069 char *dmn_memb_id_ptr; /* membership_id */
2070 member *mbr;
2071 char *num_dmns_ptr; /* int16u */
2072 int16u num_dmns;
2073 char *num_memb_ptr; /* int16u */
2074 int16u num_memb;
2075 char *memb_ptr;
2076
2077 int32u size_needed;
2078 int couldnt_fit_daemon;
2079 stdit mit;
2080 int16u send_group_changed;
2081
2082 /* A GROUPS message looks like this:
2083 * (Representative's name is in header, so we can get his proc id)
2084 * This is necessary, because we store received GROUPS messages by
2085 * synced set, to more easily recognize which to keep.
2086 * Membership id
2087 * flag (1 if first message from set, 0 else) (char)
2088 * if flag is 1
2089 * size of synced set (int32u)
2090 * proc ids of synced set represented by this daemon (int32*size)
2091 * For each group:
2092 * group name (repeated for each message it appears in) (MAX_GROUP_NAME)
2093 * group_id at the representative
2094 * [Either there is only one ID for the group (i.e. the group is not
2095 * changed in any respect by this membership) or this ID can be
2096 * discarded. This is here so that daemons that don't know about
2097 * the group at all can get the correct ID in the unchanged case.]
2098 * changed flag (int16u --boolean so always 0 or 1)
2099 * number of daemons for this group (in this message) (int16u)
2100 * For each daemon:
2101 * daemon proc id (int32)
2102 * memb id at daemon (membership_id)
2103 * number of local members at daemon (int16u)
2104 * For each local member at daemon
2105 * member's private group name (MAX_GROUP_NAME)
2106 */
2107
2108 num_bytes = 0;
2109
2110 memb_id_ptr = &buf[num_bytes];
2111 num_bytes += sizeof( membership_id );
2112 memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
2113
2114 flag_ptr = &buf[num_bytes];
2115 num_bytes += sizeof(char);
2116
2117 if (!first_time) {
2118 Set_later_message(flag_ptr);
2119
2120 } else {
2121 Set_first_message(flag_ptr);
2122 synced_set_size_ptr = &buf[num_bytes];
2123 num_bytes += sizeof(int32u);
2124 memcpy( synced_set_size_ptr, &(MySyncedSet.size), sizeof(int32u) );
2125 synced_set_procs_ptr = &buf[num_bytes];
2126 num_bytes += MySyncedSet.size * sizeof(int32);
2127 memcpy( synced_set_procs_ptr, &MySyncedSet.proc_ids, MySyncedSet.size*sizeof(int32) );
2128
2129 stdskl_begin(&GroupsList, git);
2130 }
2131
2132 /* Resume where we left off in the GroupsList */
2133 couldnt_fit_daemon = 0;
2134 while (!stdskl_is_end(&GroupsList, git))
2135 {
2136 grp = *(group**) stdskl_it_key(git);
2137
2138 if (first_time) { /* initialize dit on first call to this fcn */
2139 stdskl_begin(&grp->DaemonsList, dit);
2140 }
2141
2142 /* To have information about this group, we need to be able to fit
2143 * its name, ID, and the number of daemons it has in this message. */
2144 size_needed = GROUPS_BUF_GROUP_INFO_SIZE + Message_get_data_header_size();
2145 if( size_needed > GROUPS_BUF_SIZE - num_bytes ) break;
2146
2147 memcpy( &buf[num_bytes], grp->name, MAX_GROUP_NAME );
2148 num_bytes += MAX_GROUP_NAME;
2149
2150 memcpy( &buf[num_bytes], &grp->grp_id, sizeof(group_id) );
2151 num_bytes += sizeof(group_id);
2152
2153 send_group_changed = grp->changed;
2154 memcpy( &buf[num_bytes], &send_group_changed, sizeof(int16u) );
2155 num_bytes += sizeof(int16u);
2156
2157 num_dmns_ptr = &buf[num_bytes];
2158 num_bytes += sizeof(int16u);
2159 num_dmns = 0;
2160
2161 for (; !stdskl_is_end(&grp->DaemonsList, dit); stdskl_it_next(dit))
2162 {
2163 dmn = *(daemon_members**) stdskl_it_key(dit);
2164 /* To store this daemon's information about the current group,
2165 * we need to be able to store its proc_id, memb_id, number of
2166 * local members, and the private group names of its local members. */
2167 size_needed = GROUPS_BUF_DAEMON_INFO_SIZE +
2168 (stdskl_size(&dmn->MembersList) * MAX_GROUP_NAME) + Message_get_data_header_size();
2169 /* This requires that the number of local group members be limited. */
2170 if( size_needed > GROUPS_BUF_SIZE - num_bytes )
2171 {
2172 couldnt_fit_daemon = 1;
2173 break;
2174 }
2175 proc_id_ptr = &buf[num_bytes];
2176 num_bytes += sizeof(int32);
2177 memcpy( proc_id_ptr, &dmn->proc_id, sizeof(int32) );
2178
2179 dmn_memb_id_ptr = &buf[num_bytes];
2180 num_bytes += sizeof(membership_id);
2181 memcpy( dmn_memb_id_ptr, &grp->grp_id.memb_id, sizeof(membership_id) );
2182
2183 num_memb_ptr = &buf[num_bytes];
2184 num_bytes += sizeof(int16u);
2185 num_memb = 0;
2186
2187 for (stdskl_begin(&dmn->MembersList, &mit); !stdskl_is_end(&dmn->MembersList, &mit); stdskl_it_next(&mit))
2188 {
2189 mbr = *(member**) stdskl_it_key(&mit);
2190 /* Add to the buffer all group members from this daemon. */
2191 memb_ptr = &buf[num_bytes];
2192 num_bytes += MAX_GROUP_NAME;
2193 memcpy( memb_ptr, mbr->name, MAX_GROUP_NAME );
2194 num_memb++;
2195 }
2196 memcpy( num_memb_ptr, &num_memb, sizeof(int16u) );
2197
2198 if( num_memb != stdskl_size(&dmn->MembersList) )
2199 Alarmp( SPLOG_FATAL, GROUPS, "G_build_groups_buf: group %s has %d %d members\n",
2200 grp->name, num_memb, stdskl_size(&dmn->MembersList) );
2201 num_dmns++;
2202 }
2203 memcpy( num_dmns_ptr, &num_dmns, sizeof(int16u) );
2204 if( couldnt_fit_daemon )
2205 break;
2206
2207 stdskl_it_next(git); /* advance group iterator */
2208
2209 if (!stdskl_is_end(&GroupsList, git)) { /* if loop not done, then init dit iterator for the advanced git */
2210 grp = *(group**) stdskl_it_key(git);
2211 stdskl_begin(&grp->DaemonsList, dit);
2212 }
2213 }
2214 return( num_bytes );
2215 }
2216
G_build_new_groups_bufs()2217 static void G_build_new_groups_bufs()
2218 {
2219 stdit git, dit;
2220 int first_time = 1;
2221 groups_buf_link *grps_buf_link;
2222
2223 do {
2224 grps_buf_link = new( GROUPS_BUF_LINK );
2225 grps_buf_link->next = Groups_bufs;
2226 Groups_bufs = grps_buf_link;
2227 grps_buf_link->bytes = G_build_groups_buf(grps_buf_link->buf, &git, &dit, first_time);
2228 first_time = 0;
2229
2230 } while (!stdskl_is_end(&GroupsList, &git));
2231 }
2232
2233 /* This function used to be called G_refresh_groups_msg. */
G_stamp_groups_bufs()2234 static void G_stamp_groups_bufs()
2235 {
2236 groups_buf_link *curr;
2237 char *memb_id_ptr;
2238 for( curr = Groups_bufs; curr; curr = curr->next )
2239 {
2240 memb_id_ptr = curr->buf;
2241 memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
2242 }
2243 }
2244
G_discard_groups_bufs()2245 static void G_discard_groups_bufs()
2246 {
2247 groups_buf_link *next;
2248
2249 for( ; Groups_bufs; Groups_bufs = next )
2250 {
2251 next = Groups_bufs->next;
2252 dispose( Groups_bufs );
2253 }
2254 return;
2255 }
2256
G_send_groups_messages()2257 static int G_send_groups_messages()
2258 {
2259 groups_buf_link *grps_buf_link;
2260 down_link *down_ptr;
2261 message_obj *msg;
2262 message_header *head_ptr;
2263 int i = 0;
2264
2265 for( grps_buf_link = Groups_bufs; grps_buf_link != NULL; grps_buf_link = grps_buf_link->next ) {
2266 msg = Message_new_message();
2267 G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
2268 head_ptr = Message_get_message_header(msg);
2269 if( grps_buf_link->next )
2270 head_ptr->type |= RELIABLE_MESS;
2271 else
2272 head_ptr->type |= AGREED_MESS;
2273 Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
2274
2275 down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
2276 down_ptr->mess = msg;
2277 Obj_Inc_Refcount(down_ptr->mess);
2278 /* Use control queue--not normal session queues */
2279 Prot_new_message( down_ptr, Groups_control_down_queue );
2280 Message_Dec_Refcount(msg);
2281 ++i;
2282 }
2283 return i;
2284 }
2285
2286 /* This function fills the synced set from the synced set portion of a
2287 * groups message if there is one, and adds all the group membership
2288 * information to the GroupsList. */
G_mess_to_groups(message_link * mess_link,synced_set * sset)2289 static int G_mess_to_groups( message_link *mess_link, synced_set *sset )
2290 {
2291 /* The function returns 0 for success or -1 if an error occured
2292 * Right now, there are no errors that can occur. However,
2293 * if we add stricter checks on the daemons, that may change. */
2294
2295 message_obj *msg;
2296 message_header *head_ptr;
2297 scatter *scat;
2298 int num_bytes, total_bytes;
2299
2300 char *flag_ptr; /* char, only need 1 bit, really */
2301 char *synced_set_size_ptr; /* int32u */
2302 char *synced_set_procs_ptr; /* int32 */
2303
2304 group *grp;
2305 char *group_name_ptr;
2306 int16u num_dmns;
2307 daemon_members *dmn;
2308 int16u num_memb;
2309 member *mbr;
2310 int i,j;
2311 char ip_string[16];
2312 stdit it;
2313 int16u sent_group_changed;
2314
2315 total_bytes = 0;
2316 msg = mess_link->mess;
2317 scat = Message_get_data_scatter(msg);
2318 for( i=0; i < scat->num_elements ; i++ )
2319 {
2320 memcpy( &Temp_buf[total_bytes], scat->elements[i].buf, scat->elements[i].len );
2321 total_bytes += scat->elements[i].len;
2322 }
2323
2324 num_bytes = Message_get_data_header_size();
2325 head_ptr = Message_get_message_header(msg);
2326
2327 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: message from rep %s\n", head_ptr->private_group_name );
2328 num_bytes += sizeof( membership_id );
2329
2330 flag_ptr = &Temp_buf[num_bytes];
2331 num_bytes += sizeof(char);
2332 if( Is_first_message(flag_ptr) )
2333 {
2334 /* Populate the synced_set from the first message */
2335 synced_set_size_ptr = &Temp_buf[num_bytes];
2336 num_bytes += sizeof(int32u);
2337 memcpy( &sset->size, synced_set_size_ptr, sizeof(int32u) );
2338 if( !Same_endian( head_ptr->type ) )
2339 sset->size = Flip_int32( sset->size );
2340 synced_set_procs_ptr = &Temp_buf[num_bytes];
2341 num_bytes += sset->size * sizeof(int32) ;
2342 memcpy( &sset->proc_ids, synced_set_procs_ptr, sset->size * sizeof(int32) );
2343 if( !Same_endian( head_ptr->type ) )
2344 for( i = 0; i < sset->size; ++i )
2345 sset->proc_ids[i] = Flip_int32( sset->proc_ids[i] );
2346 }
2347
2348 /* Read the groups data, and insert it into the GroupsList */
2349 for( ; num_bytes < total_bytes; )
2350 {
2351 group_name_ptr = &Temp_buf[num_bytes];
2352 num_bytes += MAX_GROUP_NAME;
2353
2354 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: group %s\n", group_name_ptr );
2355 /* Create a group if necessary, and be careful to mark as changed if needed. */
2356 grp = G_get_group( group_name_ptr );
2357 if( grp == NULL )
2358 {
2359 grp = new( GROUP );
2360 memset( grp->name, 0, MAX_GROUP_NAME );
2361 strcpy( grp->name, group_name_ptr );
2362
2363 if (stdskl_construct(&grp->DaemonsList, sizeof(daemon_members*), 0, G_compare_proc_ids_by_conf) != 0) {
2364 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
2365 }
2366
2367 if (stdarr_construct(&grp->mboxes, sizeof(mailbox), 0) != 0) {
2368 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
2369 }
2370
2371 grp->changed = FALSE;
2372 grp->num_members = 0;
2373
2374 /* Set a group id here, so that if the group isn't changed,
2375 * everyone will have the right ID (because all must have same). */
2376 memcpy( &grp->grp_id, &Temp_buf[num_bytes], sizeof(group_id) );
2377 if( !Same_endian( head_ptr->type ) )
2378 {
2379 /* Flip group id */
2380 grp->grp_id.memb_id.proc_id = Flip_int32( grp->grp_id.memb_id.proc_id );
2381 grp->grp_id.memb_id.time = Flip_int32( grp->grp_id.memb_id.time );
2382 grp->grp_id.index = Flip_int32( grp->grp_id.index );
2383 }
2384
2385 if (stdskl_put(&GroupsList, NULL, &grp, NULL, STDFALSE) != 0) {
2386 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
2387 }
2388
2389 Num_groups++;
2390 GlobalStatus.num_groups = Num_groups;
2391 }
2392 num_bytes += sizeof(group_id);
2393 /* Get the changed flag for sent group and set local group changed flag if sent group was marked changed */
2394 memcpy( &sent_group_changed, &Temp_buf[num_bytes], sizeof(int16u) );
2395 num_bytes += sizeof(int16u);
2396
2397 if (sent_group_changed)
2398 grp->changed = TRUE;
2399
2400 memcpy( &num_dmns, &Temp_buf[num_bytes], sizeof(int16u) );
2401 num_bytes += sizeof(int16u);
2402 if( !Same_endian( head_ptr->type ) )
2403 num_dmns = Flip_int16( num_dmns );
2404 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \twith %u daemons\n", num_dmns );
2405
2406 /* For each daemon in the message for this group.
2407 * Create a daemon object, and add it to the DaemonsList.
2408 * Add all members to the MembersList. */
2409 for( i = 0; i < num_dmns; ++i )
2410 {
2411 /* FIXME: If I was paranoid, I could always check here that the daemon
2412 * isn't already in my GroupsList, or that it is in my conf (from Reg_memb). */
2413 dmn = new( DAEMON_MEMBERS );
2414 memcpy( &dmn->proc_id, &Temp_buf[num_bytes], sizeof(int32) );
2415 num_bytes += sizeof(int32);
2416 memcpy( &dmn->memb_id, &Temp_buf[num_bytes], sizeof(membership_id) );
2417 num_bytes += sizeof(membership_id);
2418 memcpy( &num_memb, &Temp_buf[num_bytes], sizeof(int16u) );
2419 num_bytes += sizeof(int16u);
2420 if( !Same_endian( head_ptr->type ) )
2421 {
2422 dmn->proc_id = Flip_int32( dmn->proc_id );
2423 dmn->memb_id.proc_id = Flip_int32( dmn->memb_id.proc_id );
2424 dmn->memb_id.time = Flip_int32( dmn->memb_id.time );
2425 num_memb = Flip_int16( num_memb );
2426 }
2427 IP_to_STR( dmn->proc_id, ip_string );
2428 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \tdaemon with proc_id %s\n", ip_string );
2429 IP_to_STR( dmn->memb_id.proc_id, ip_string );
2430 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \t\twith memb_id (%s, %d)\n",
2431 ip_string, dmn->memb_id.time );
2432 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \t\twith %u members:\n", num_memb );
2433
2434 if (stdskl_construct(&dmn->MembersList, sizeof(member*), 0, G_compare_nameptr) != 0) {
2435 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
2436 }
2437
2438 if (stdskl_put(&grp->DaemonsList, NULL, &dmn, NULL, STDFALSE) != 0) {
2439 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
2440 }
2441
2442 if( !grp->changed &&
2443 !Memb_is_equal( dmn->memb_id, grp->grp_id.memb_id ) )
2444 grp->changed = TRUE;
2445
2446 /* creating members */
2447 for( j = 0; j < num_memb; ++j )
2448 {
2449 mbr = new( MEMBER );
2450 memcpy( mbr->name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
2451 Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \t\t%s\n", mbr->name );
2452 num_bytes += MAX_GROUP_NAME;
2453
2454 /* this inserts into MembersList hinting that the insertion should be at the end of the list (faster if input sorted) */
2455
2456 if (stdskl_put(&dmn->MembersList, stdskl_end(&dmn->MembersList, &it), &mbr, NULL, STDTRUE) != 0) {
2457 Alarmp( SPLOG_FATAL, GROUPS, "%s: %d: memory allocation failed\n", __FILE__, __LINE__ );
2458 }
2459 }
2460 grp->num_members += num_memb;
2461 }
2462 }
2463 return( 0 );
2464 }
2465
2466 /* TODO: G_analize_groups is O(n) when num_groups = 1 and O(n^2) when
2467 num_groups > 1 (multi_group_multicast) in the number of targeted
2468 mailboxes. In the latter case, if n is large a O(n lg n) solution
2469 leveraging a skiplist would be preferable.
2470 */
2471
G_analize_groups(int num_groups,char target_groups[][MAX_GROUP_NAME],int target_sessions[])2472 int G_analize_groups( int num_groups, char target_groups[][MAX_GROUP_NAME], int target_sessions[] )
2473 {
2474 static mailbox mboxes[MAX_SESSIONS];
2475 int num_mbox;
2476 mailbox *bigbox_ptr;
2477 mailbox *bigend_ptr;
2478 mailbox *litbox_ptr;
2479 mailbox *litend_ptr;
2480 group *grp;
2481 char proc_name[MAX_PROC_NAME];
2482 char private_name[MAX_PRIVATE_NAME+1];
2483 int ses;
2484 int ret;
2485 int i;
2486 int *orig_target_sessions = target_sessions;
2487
2488 /* collect the target local mailboxen */
2489
2490 num_mbox = 0;
2491 litbox_ptr = NULL;
2492 litend_ptr = NULL;
2493
2494 for ( i=0; i < num_groups; ++i )
2495 {
2496 if( target_groups[i][0] != '#' )
2497 {
2498 /* regular group */
2499 grp = G_get_group( target_groups[i] );
2500
2501 if( grp == NULL ) {
2502 continue;
2503 }
2504
2505 if( Gstate == GOP || Gstate == GTRANS ) {
2506 litbox_ptr = (mailbox*) grp->mboxes.begin; /* point litbox pointer at grp->mboxes */
2507 litend_ptr = litbox_ptr + grp->mboxes.size;
2508
2509 } else {
2510 Alarmp( SPLOG_FATAL, GROUPS, "G_analize_groups: Gstate is %d\n", Gstate );
2511 }
2512
2513 } else {
2514 /* private group */
2515 ret = G_private_to_names( target_groups[i], private_name, proc_name );
2516
2517 /* Illegal group OR this private group is not local OR we have no such session */
2518 if( ret < 0 || strcmp( My.name, proc_name ) != 0 || (ses = Sess_get_session( private_name )) < 0) {
2519 continue;
2520 }
2521
2522 litbox_ptr = &Sessions[ ses ].mbox; /* just point litbox pointer at the session's mbox */
2523 litend_ptr = litbox_ptr + 1;
2524 }
2525
2526 /* NOTE: the following code assumes that grp->mboxes contains no duplicates */
2527
2528 if (num_groups == 1) { /* no need to do extra copy work if only one group -> just use litbox 'array' directly */
2529 break;
2530 }
2531
2532 if (num_mbox != 0) { /* mboxes contains some entries already */
2533 bigend_ptr = mboxes + num_mbox;
2534
2535 for (; litbox_ptr != litend_ptr; ++litbox_ptr)
2536 {
2537 mailbox m = *litbox_ptr;
2538
2539 /* linear search over mboxes for 'm' */
2540
2541 for (bigbox_ptr = mboxes; bigbox_ptr != bigend_ptr && *bigbox_ptr != m; ++bigbox_ptr);
2542
2543 if (bigbox_ptr == bigend_ptr) {
2544 mboxes[ num_mbox++ ] = m;
2545 }
2546 }
2547
2548 } else {
2549 num_mbox = (int) (litend_ptr - litbox_ptr); /* if num_mbox is (still) zero, then adopt litbox 'array' */
2550 memcpy(mboxes, litbox_ptr, num_mbox * sizeof(mailbox));
2551 }
2552 }
2553
2554 /* convert mailboxes to sessions */
2555
2556 if (num_groups == 1) { /* on non-multi group multicast just use litbox 'array' in place */
2557 bigbox_ptr = litbox_ptr;
2558 bigend_ptr = litend_ptr;
2559
2560 } else { /* otherwise use unique entries we've built in mboxes */
2561 bigbox_ptr = mboxes;
2562 bigend_ptr = mboxes + num_mbox;
2563 }
2564
2565 for (; bigbox_ptr != bigend_ptr; ++bigbox_ptr, ++target_sessions)
2566 {
2567 *target_sessions = Sess_get_session_index( *bigbox_ptr );
2568 }
2569
2570 return (int) (target_sessions - orig_target_sessions);
2571 }
2572
G_compute_group_mask(group * grp,char * func_name)2573 static void G_compute_group_mask( group *grp, char *func_name )
2574 {
2575 #if (SPREAD_PROTOCOL == 4)
2576 int i;
2577 int temp;
2578 daemon_members *dmn;
2579 proc p;
2580 stdit it;
2581
2582 for(i=0; i<4; i++)
2583 {
2584 grp->grp_mask[i] = 0;
2585 }
2586 for (stdskl_begin(&grp->DaemonsList, &it); !stdskl_is_end(&grp->DaemonsList, &it); stdskl_it_next(&it))
2587 {
2588 dmn = *(daemon_members**) stdskl_it_key(&it);
2589 Conf_proc_by_id( dmn->proc_id, &p );
2590
2591 /* FIXME: TODO: isn't the following loop the same as: temp = (0x1 << (p.seg_index & 0x1F)); ??? */
2592
2593 temp = 1;
2594 for( i = 0; i < p.seg_index%32; i++ )
2595 {
2596 temp *= 2;
2597 }
2598 grp->grp_mask[p.seg_index/32] |= temp;
2599 }
2600 Alarmp( SPLOG_INFO, GROUPS, "%s: Mask for group %s set to %x %x %x %x\n", func_name,
2601 grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
2602 #endif
2603 }
2604
G_set_mask(int num_groups,char target_groups[][MAX_GROUP_NAME],int32u * grp_mask)2605 void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask )
2606 {
2607 group *grp;
2608 char proc_name[MAX_PROC_NAME];
2609 char private_name[MAX_PRIVATE_NAME+1];
2610 int ret;
2611 int i, j;
2612 proc p;
2613 int32u temp;
2614
2615
2616 for(i=0; i<4; i++)
2617 {
2618 grp_mask[i] = 0;
2619 }
2620
2621 for( i=0; i < num_groups; i++ )
2622 {
2623 if( target_groups[i][0] == '#' )
2624 {
2625 /* private group */
2626 ret = G_private_to_names( target_groups[i], private_name, proc_name );
2627
2628 /* Illegal group */
2629 if( ret < 0 ) continue;
2630
2631 Conf_proc_by_name( proc_name, &p );
2632 temp = 1;
2633 for(j=0; j<p.seg_index%32; j++)
2634 {
2635 temp *= 2;
2636 }
2637 grp_mask[p.seg_index/32] |= temp;
2638
2639 }else{
2640 /* regular group */
2641 grp = G_get_group( target_groups[i] );
2642 if( grp == NULL )
2643 {
2644 p = Conf_my();
2645 temp = 1;
2646 for(j=0; j<p.seg_index%32; j++)
2647 {
2648 temp *= 2;
2649 }
2650 grp_mask[p.seg_index/32] |= temp;
2651 }
2652 else if(( Gstate == GOP )||(Gstate == GTRANS))
2653 {
2654 for(j=0; j<4; j++)
2655 {
2656 grp_mask[j] |= grp->grp_mask[j];
2657 }
2658 p = Conf_my();
2659 temp = 1;
2660 for(j=0; j<p.seg_index%32; j++)
2661 {
2662 temp *= 2;
2663 }
2664 grp_mask[p.seg_index/32] |= temp;
2665
2666 }else Alarmp( SPLOG_FATAL, GROUPS, "G_set_mask: Gstate is %d\n", Gstate );
2667 }
2668 }
2669 }
2670
G_private_to_names(char * private_group_name,char * private_name,char * proc_name)2671 int G_private_to_names( char *private_group_name, char *private_name, char *proc_name )
2672 {
2673 char name[MAX_GROUP_NAME];
2674 char *pn, *prvn;
2675 unsigned int priv_name_len, proc_name_len;
2676 int i,legal_private_name;
2677
2678 memcpy(name, private_group_name, MAX_GROUP_NAME );
2679 proc_name_len = 0; /* gcc not smart enough to detect that proc_name_len is always initialized when used */
2680
2681 pn = strchr(&name[1], '#');
2682 if (pn != NULL)
2683 {
2684 pn[0] = '\0';
2685 proc_name_len = strlen( &(pn[1]));
2686 }
2687 priv_name_len = strlen( &(name[1]));
2688 if ( (pn == NULL) || (name[0] != '#' ) ||
2689 ( priv_name_len > MAX_PRIVATE_NAME) ||
2690 ( priv_name_len < 1 ) ||
2691 ( proc_name_len >= MAX_PROC_NAME ) ||
2692 ( proc_name_len < 1 ) )
2693 {
2694 Alarmp( SPLOG_ERROR, GROUPS, "G_private_to_names: Illegal private_group_name %s (priv, proc)\n",
2695 private_group_name );
2696 return( ILLEGAL_GROUP );
2697 }
2698 /* start strings at actual beginning */
2699 pn++;
2700 pn[proc_name_len] = '\0';
2701 prvn = &name[1];
2702 legal_private_name = 1;
2703 for( i=0; i < priv_name_len; i++ )
2704 if( prvn[i] <= '#' ||
2705 prvn[i] > '~' )
2706 {
2707 legal_private_name = 0;
2708 prvn[i] = '.';
2709 }
2710 for( i=0; i < proc_name_len; i++ )
2711 if( pn[i] <= '#' ||
2712 pn[i] > '~' )
2713 {
2714 legal_private_name = 0;
2715 pn[i] = '.';
2716 }
2717 if( !legal_private_name )
2718 {
2719 Alarmp( SPLOG_ERROR, GROUPS, "G_private_to_names: Illegal private_group_name characters (%s) (%s)\n",
2720 prvn, pn );
2721 return( ILLEGAL_GROUP );
2722 }
2723 /* copy name components including null termination */
2724 memcpy( private_name, prvn, priv_name_len + 1 );
2725 memcpy( proc_name, pn, proc_name_len + 1 );
2726 return( 1 );
2727 }
2728
G_print()2729 static void G_print()
2730 {
2731 group *grp;
2732 daemon_members *dmn;
2733 member *mbr;
2734 int i, j, k;
2735 stdit git, dit, mit;
2736
2737 Alarmp( SPLOG_PRINT, GROUPS, "++++++++++++++++++++++\n" );
2738 Alarmp( SPLOG_PRINT, GROUPS, "Num of groups: %d\n", Num_groups );
2739
2740 for (i = 0, stdskl_begin(&GroupsList, &git); !stdskl_is_end(&GroupsList, &git); ++i, stdskl_it_next(&git))
2741 {
2742 grp = *(group**) stdskl_it_key(&git);
2743 Alarmp( SPLOG_PRINT, GROUPS, "[%d] group %s with %d members:\n", i+1, grp->name, grp->num_members );
2744
2745 for (j = 0, stdskl_begin(&grp->DaemonsList, &dit); !stdskl_is_end(&grp->DaemonsList, &dit); ++j, stdskl_it_next(&dit))
2746 {
2747 dmn = *(daemon_members**) stdskl_it_key(&dit);
2748
2749 for (k = 0, stdskl_begin(&dmn->MembersList, &mit); !stdskl_is_end(&dmn->MembersList, &mit); ++k, stdskl_it_next(&mit))
2750 {
2751 mbr = *(member**) stdskl_it_key(&mit);
2752 Alarmp( SPLOG_PRINT, GROUPS, "\t[%d] %s\n", k+1, mbr->name );
2753 }
2754 }
2755 Alarmp( SPLOG_PRINT, GROUPS, "----------------------\n" );
2756 }
2757 }
2758
G_get_num_local(char * group_name)2759 int G_get_num_local( char *group_name )
2760 {
2761 group *grp = G_get_group( group_name );
2762 if( grp == NULL ) return 0;
2763 return stdarr_size(&grp->mboxes);
2764 }
2765
2766 /* Add new members to my synced set. */
G_add_to_synced_set(synced_set * sset)2767 static void G_add_to_synced_set( synced_set *sset ) {
2768 synced_set temp;
2769 int32u i = 0, j = 0;
2770 int index_l = -1, index_r = -1;
2771 proc dummy_proc;
2772
2773 temp.size = 0;
2774 while( i < MySyncedSet.size || j < sset->size ) {
2775 if( i < MySyncedSet.size && index_l == -1 ) {
2776 index_l = Conf_proc_by_id( MySyncedSet.proc_ids[i], &dummy_proc );
2777 if( index_l == -1 )
2778 Alarmp( SPLOG_FATAL, GROUPS, "G_add_to_synced_set: proc_id %u not in conf\n",
2779 MySyncedSet.proc_ids[i] );
2780 }
2781 if( j < sset->size && index_r == -1 ) {
2782 index_r = Conf_proc_by_id( sset->proc_ids[j], &dummy_proc );
2783 if( index_r == -1 )
2784 Alarmp( SPLOG_FATAL, GROUPS, "G_add_to_synced_set: proc_id %u not in conf\n",
2785 sset->proc_ids[j] );
2786 }
2787 if( ( index_l < index_r && index_l != -1 ) || index_r == -1 ) {
2788 temp.proc_ids[temp.size++] = MySyncedSet.proc_ids[i++];
2789 index_l = -1;
2790 } else if( ( index_r < index_l && index_r != -1 ) || index_l == -1 ) {
2791 temp.proc_ids[temp.size++] = sset->proc_ids[j++];
2792 index_r = -1;
2793 } else {
2794 Alarmp( SPLOG_FATAL, GROUPS, "G_add_to_synced_set: intersection isn't empty --"
2795 " equal procs %u and %u\n", MySyncedSet.proc_ids[i], sset->proc_ids[j] );
2796 }
2797 }
2798 memcpy( &MySyncedSet, &temp, sizeof(synced_set) );
2799 }
2800
2801 /* Remove members who aren't in the membership. */
G_update_synced_set(synced_set * s,configuration * memb_p)2802 static void G_update_synced_set( synced_set *s, configuration *memb_p ) {
2803 bool ret;
2804 ret = G_update_synced_set_status( s, memb_p );
2805 return;
2806 }
2807
2808 /* Remove members who aren't in the membership and
2809 * return true if group changed membership or false otherwise */
G_update_synced_set_status(synced_set * s,configuration * memb_p)2810 static bool G_update_synced_set_status( synced_set *s, configuration *memb_p )
2811 {
2812 int i, j = 0;
2813 bool changed = FALSE;
2814
2815 for( i = 0; i < s->size; ++i )
2816 if( Conf_id_in_conf( memb_p, s->proc_ids[i] ) >= 0 )
2817 s->proc_ids[j++] = s->proc_ids[i];
2818 /* If we lost members. */
2819 if( j != s->size ) {
2820 s->size = j;
2821 changed = TRUE;
2822 }
2823 return changed;
2824 }
2825
2826 /* Print the synced set. For debugging. */
G_print_synced_set(int priority,synced_set * s,char * func_name)2827 static void G_print_synced_set( int priority, synced_set *s, char *func_name )
2828 {
2829 int i;
2830 proc p;
2831 Alarmp( priority, GROUPS, "%s: Synced Set (with %u members):\n", func_name, s->size );
2832 for( i = 0; i < s->size; ++i ) {
2833 Conf_proc_by_id( s->proc_ids[i], &p );
2834 Alarmp( priority, GROUPS, "%s: \t%s\n", func_name, p.name );
2835 }
2836 }
2837
2838 /* Eliminate the partitioned daemons of a group. */
G_eliminate_partitioned_daemons(group * grp)2839 static void G_eliminate_partitioned_daemons( group *grp )
2840 {
2841 bool ret;
2842 ret = G_eliminate_partitioned_daemons_status( grp );
2843 return;
2844 }
2845
2846 /* Eliminate the partitioned daemons of a group. Return true if we changed the
2847 * group. */
G_eliminate_partitioned_daemons_status(group * grp)2848 static bool G_eliminate_partitioned_daemons_status( group *grp )
2849 {
2850 daemon_members *dmn;
2851 bool group_changed = FALSE;
2852 int needed;
2853 stdit it;
2854
2855 for (stdskl_begin(&grp->DaemonsList, &it); !stdskl_is_end(&grp->DaemonsList, &it); )
2856 {
2857 dmn = *(daemon_members**) stdskl_it_key(&it);
2858 stdskl_it_next(&it); /* NOTE: advance here to protect against potential removal below */
2859
2860 needed = 0;
2861 /* The first condition is sufficient, but we can optimize a bit this way. */
2862 if( Gstate == GGT ) /* Called in G_handle_reg_memb after we got a cascading transitional */
2863 {
2864 if( Conf_id_in_conf( &Trans_memb, dmn->proc_id ) == -1 )
2865 {
2866 needed = 1;
2867 } else {
2868 needed = 0;
2869 }
2870 } else { /* Called because we got the non-cascading regular membership */
2871 if( Is_partitioned_daemon( dmn ) )
2872 {
2873 needed = 1;
2874 } else {
2875 needed = 0;
2876 }
2877 }
2878 if( needed )
2879 {
2880 /* discard this daemon and its members - proc no longer in membership */
2881 G_remove_daemon( grp, dmn );
2882 group_changed = TRUE;
2883 }
2884 }
2885 return group_changed;
2886 }
2887
2888 /* This function is only called when we handle a cascading transitional membership.
2889 * Gstate should be GGATHER, about to change to GGT */
G_check_if_changed_by_cascade(group * grp)2890 static bool G_check_if_changed_by_cascade( group *grp )
2891 {
2892 daemon_members *dmn;
2893 bool group_changed = FALSE;
2894 stdit it;
2895
2896 for (stdskl_begin(&grp->DaemonsList, &it); !stdskl_is_end(&grp->DaemonsList, &it); stdskl_it_next(&it))
2897 {
2898 dmn = *(daemon_members**) stdskl_it_key(&it);
2899 if( Conf_id_in_conf( &Trans_memb, dmn->proc_id ) == -1 )
2900 {
2901 group_changed = TRUE;
2902 break;
2903 }
2904 }
2905 return group_changed;
2906 }
2907
G_remove_daemon(group * grp,daemon_members * dmn)2908 static void G_remove_daemon( group *grp, daemon_members *dmn )
2909 {
2910 stdit it;
2911 int32 * proc_id_ptr = &dmn->proc_id;
2912
2913 if (stdskl_is_end(&grp->DaemonsList, stdskl_find(&grp->DaemonsList, &it, &proc_id_ptr))) {
2914 Alarmp( SPLOG_FATAL, GROUPS, "G_remove_daemon: invalid daemon(%d.%d.%d.%d) removal from group(%s)\n",
2915 IP1(dmn->proc_id), IP2(dmn->proc_id), IP3(dmn->proc_id), IP4(dmn->proc_id), grp->name );
2916 }
2917
2918 stdskl_erase(&grp->DaemonsList, &it);
2919
2920 grp->num_members -= stdskl_size(&dmn->MembersList);
2921
2922 for (stdskl_begin(&dmn->MembersList, &it); !stdskl_is_end(&dmn->MembersList, &it); stdskl_it_next(&it)) {
2923 dispose(*(member**) stdskl_it_key(&it)); /* NOTE: this is only safe because we do destruct immediately after */
2924 }
2925
2926 stdskl_destruct(&dmn->MembersList);
2927 dispose(dmn);
2928 }
2929
2930 /* Remove a group that is known to be empty. */
G_remove_group(group * grp)2931 static void G_remove_group( group *grp )
2932 {
2933 stdit it;
2934
2935 assert( stdskl_empty(&grp->DaemonsList) );
2936 assert( stdarr_empty(&grp->mboxes) );
2937
2938 if (stdskl_is_end(&GroupsList, stdskl_find(&GroupsList, &it, &grp))) {
2939 Alarmp( SPLOG_FATAL, GROUPS, "G_remove_group: invalid group removal(%s)\n", grp->name );
2940 }
2941
2942 stdskl_erase(&GroupsList, &it);
2943
2944 stdskl_destruct(&grp->DaemonsList);
2945 stdarr_destruct(&grp->mboxes);
2946 dispose(grp);
2947 Num_groups--;
2948 GlobalStatus.num_groups = Num_groups;
2949 }
2950
2951 /* Remove a local mailbox from grp->mboxes */
G_remove_mailbox(group * grp,mailbox m)2952 static void G_remove_mailbox( group *grp, mailbox m )
2953 {
2954 mailbox *mbox_ptr = (mailbox*) grp->mboxes.begin; /* address of first entry */
2955 mailbox *endbox_ptr = mbox_ptr + grp->mboxes.size; /* address of 1 past last entry */
2956
2957 for (; mbox_ptr != endbox_ptr && *mbox_ptr != m; ++mbox_ptr); /* linear search over grp->mboxes */
2958
2959 if (mbox_ptr == endbox_ptr) {
2960 Alarmp( SPLOG_FATAL, GROUPS, "G_remove_mailbox: didn't find local mailbox %d in group '%s'\n", m, grp->name );
2961 }
2962
2963 --endbox_ptr; /* point at last entry in grp->mboxes */
2964 *mbox_ptr = *endbox_ptr; /* overwrite 'm' entry w/ last entry */
2965
2966 stdarr_pop_back(&grp->mboxes); /* pop off redundant last entry */
2967 }
2968
2969 /* All non-partitioned daemons get the membership ID component of the
2970 * groups current group ID. */
G_update_daemon_memb_ids(group * grp)2971 static void G_update_daemon_memb_ids( group *grp )
2972 {
2973 stdit it;
2974
2975 for (stdskl_begin(&grp->DaemonsList, &it); !stdskl_is_end(&grp->DaemonsList, &it); stdskl_it_next(&it))
2976 {
2977 daemon_members *dmn = *(daemon_members**) stdskl_it_key(&it);
2978
2979 if( Is_established_daemon( dmn ) ) {
2980 dmn->memb_id = grp->grp_id.memb_id;
2981 }
2982 }
2983 }
2984
G_print_group_id(int priority,group_id g,char * func_name)2985 static void G_print_group_id( int priority, group_id g, char *func_name )
2986 {
2987 char ip_string[16];
2988 IP_to_STR( g.memb_id.proc_id, ip_string );
2989 Alarmp( priority, GROUPS,
2990 "%s: Group_id {Proc ID: %s, Time: %d, Index: %d}\n", func_name,
2991 ip_string, g.memb_id.time, g.index );
2992 }
2993