1 /*
2 * Copyright (C) 2002-2009, Edmundo Albuquerque de Souza e Silva.
3 *
4 * This file may be distributed under the terms of the Q Public License
5 * as defined by Trolltech AS of Norway and appearing in the file
6 * LICENSE.QPL included in the packaging of this file.
7 *
8 * THIS FILE IS PROVIDED AS IS WITH NO WARRANTY OF ANY KIND, INCLUDING
9 * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
10 * PURPOSE. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
11 * INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
12 * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
13 * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
14 * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 *
16 */
17
18 /***************************************************************************
19 rmevent.c
20 -------------------
21 begin : May 2001
22 Authors : Jorge Allyson Azevedo
23 Milena Scanferla
24 Daniel Sadoc
25 email : {allyson,milena,sadoc}@land.ufrj.br
26 ***************************************************************************/
27
28 #ifndef RMEVENT_C
29 #define RMEVENT_C
30
31 #include <signal.h>
32 #include <stdlib.h>
33 #include <sys/time.h>
34 #include <unistd.h>
35 #include <errno.h>
36 #include <pthread.h>
37
38
39 #include "rmstruct.h"
40 #include "rmevent.h"
41 #include "rminternals.h"
42 #include "rmwinmask.h"
43
44 #include "rmcast.h"
45
46 /*---------------- Global variables ------------------------------------------------------------------*/
47
48 #ifdef DSUPPRESSION
49 extern FILE *suppressionfile;
50 #endif
51 extern pthread_t rcv_thread, signal_handler_thread;
52
53 extern GLOBAL_OPTIONS rmcast_options;
54
55 #ifndef RANDOM_TIMERS
56 static long DEFAULT_TIMER_VALUE [7] = { 0,
57 1000000,
58 2000000,
59 1000000,
60 10000000,
61 5000000,
62 0 };
63 #endif
64
65 char *EVENT_ACTION_NAME[9] = { "ZERO_ACTION",
66 "NAK_SND_WAIT",
67 "RET_RCV_WAIT",
68 "RET_SND_WAIT",
69 "REF_SND_WAIT",
70 "LEV_GRP_WAIT",
71 "SUPPRESSED_NAK",
72 "RET_RCV_EXPIRED",
73 "UNKNOWN"};
74
75 #ifdef SOLARIS
76 pthread_mutex_t event_list_mutex;
77 #else
78 pthread_mutex_t event_list_mutex = PTHREAD_MUTEX_INITIALIZER;
79 #endif
80
81 extern CACHE *cache;
82 extern USER_INFO local_user_info;
83
84 /*
85 * inline means that the precompiler will replace the calling of getRemaining time with the
86 * code of the function, i.e., the function will not exist in compile time
87 */
88 inline void getRemainingTime(EVENT_LIST **el, long double*remaining_time, char *msg);
89
90 EVENT_LIST * eventListFind2(EVENT_LIST **el1,MEMBER_ID *member_id, char action, int sn, EVENT_LIST **antNode);
91
92 extern GLOBAL_OPTIONS rmcast_options;
93
94 /*---------------- Auxiliary functions to manipulate the timer ---------------------------------------*/
95
96
97 /***************************************************************************************
98 *
99 * float generateSample(char distribution)
100 *
101 * Generates a random sample.
102 *
103 * Arguments: distribution, the distribution of the random var (not yet implemented).
104 *
105 * Returns: a random number between 0 and 1.
106 *
107 ***************************************************************************************/
108
generateSample(char distribution)109 float generateSample(char distribution)
110 {
111 double sample=0,uniform=0;
112 double e,lambda;
113
114 switch(distribution)
115 {
116 case UNIFORM:
117
118 sample=((double)random()/(double)RAND_MAX);
119
120 break;
121
122 case EXPONENTIAL:
123
124 e = exp( 1 );
125
126 /* Lambda must be log(number of members of the multicast group) + 1 */
127 lambda = log( cacheCountMembers( cache,CACHE_STATUS_ACTIVE ) ) + 1;
128 uniform = ((double)random()/(double)RAND_MAX);
129 sample = -(log(uniform)/lambda);
130
131 break;
132
133 default:
134
135 fprintf(stderr,"generateSample ERROR: unknown distribution=%c",distribution);
136 }
137 return ( sample );
138 }
139
140 /***************************************************************************************
141 *
142 * int generateTimerValue(MEMBER_ID member_id, char action, char distribution)
143 *
144 * Generates a timer value corresponding to the defined action. This value indicates
145 * in how many microseconds the action will be executed.
146 *
147 * Arguments: member_id, member id of the member related to the event
148 * action, the action to be executed.
149 * distribution, the distribution of the random var (not yet implemented).
150 *
151 * Returns: the timer value, in microseconds.
152 *
153 ***************************************************************************************/
154
155
generateTimerValue(char action,char distribution,char * sender_ip)156 int generateTimerValue(char action, char distribution, char *sender_ip)
157 {
158 int retval;
159 int estimated_delay, /* estimated one-way delay */
160 timerlow, /* lower bound for the timer interval */
161 timerhigh; /* higher bound for the timer interval */
162
163 retval = 0;
164
165 switch((int)action){
166
167 case NAK_SND_WAIT:
168
169 #ifndef RANDOM_TIMERS
170 retval = DEFAULT_TIMER_VALUE[(int)action];
171 #else
172 if (RM_getHostDelay(sender_ip,&estimated_delay) > 0){
173 timerlow= rmcast_options.timer_paramA*estimated_delay;
174 timerhigh= (rmcast_options.timer_paramA+rmcast_options.timer_paramB)*estimated_delay;
175 retval = (int)1000*( (timerhigh - timerlow) * generateSample(distribution) + timerlow);
176 /* fprintf(stderr,"Using timer related to %s:%d %d %d %d\n",sender_ip,(int)action,timerlow,timerhigh,retval); */
177 }
178 else{
179 RM_getHostDelay("DEFAULT",&estimated_delay);
180 timerlow= rmcast_options.timer_paramA*estimated_delay;
181 timerhigh= (rmcast_options.timer_paramA+rmcast_options.timer_paramB)*estimated_delay;
182 retval = (int)1000*( (timerhigh - timerlow) * generateSample(distribution) + timerlow);
183 /* fprintf(stderr,"Using timer related to %s:%d %d %d %d\n",sender_ip,(int)action,timerlow,timerhigh,retval); */
184 }
185 #endif
186
187 break;
188
189 case RET_RCV_WAIT:
190 #ifndef RANDOM_TIMERS
191 retval = DEFAULT_TIMER_VALUE[(int)action];
192 #else
193 if (RM_getHostDelay(sender_ip,&estimated_delay) > 0){
194 timerlow= rmcast_options.timer_paramC*estimated_delay;
195 timerhigh= (rmcast_options.timer_paramC+rmcast_options.timer_paramD)*estimated_delay;
196 retval = (int)1000*( (timerhigh - timerlow) * generateSample(distribution) + timerlow);
197 /* fprintf(stderr,"Using timer related to %s:%d %d %d %d\n",sender_ip,(int)action,timerlow,timerhigh,retval); */
198 }
199 else{
200 RM_getHostDelay("DEFAULT",&estimated_delay);
201 timerlow= rmcast_options.timer_paramA*estimated_delay;
202 timerhigh= (rmcast_options.timer_paramA+rmcast_options.timer_paramB)*estimated_delay;
203 retval = (int)1000*( (timerhigh - timerlow) * generateSample(distribution) + timerlow);
204 /* fprintf(stderr,"Using timer related to %s:%d %d %d %d\n",sender_ip,(int)action,timerlow,timerhigh,retval); */
205 }
206
207 #endif
208
209 break;
210
211 case RET_SND_WAIT:
212 #ifndef RANDOM_TIMERS
213 retval = DEFAULT_TIMER_VALUE[(int)action];
214 #else
215 if (RM_getHostDelay(sender_ip,&estimated_delay) > 0){
216 timerlow= rmcast_options.timer_paramE*estimated_delay;
217 timerhigh= (rmcast_options.timer_paramE+rmcast_options.timer_paramF)*estimated_delay;
218 retval = (int)1000*( (timerhigh - timerlow) * generateSample(distribution) + timerlow);
219 /* fprintf(stderr,"Using timer related to %s:%d %d %d %d\n",sender_ip,(int)action,timerlow,timerhigh,retval); */
220 }
221 else{
222 RM_getHostDelay("DEFAULT",&estimated_delay);
223 timerlow= rmcast_options.timer_paramC*estimated_delay;
224 timerhigh= (rmcast_options.timer_paramC+rmcast_options.timer_paramD)*estimated_delay;
225 retval = (int)1000*( (timerhigh - timerlow) * generateSample(distribution) + timerlow);
226 /* fprintf(stderr,"Using timer related to %s:%d %d %d %d\n",sender_ip,(int)action,timerlow,timerhigh,retval); */
227 }
228 #endif
229
230 break;
231
232 case REF_SND_WAIT:
233
234 retval = rmcast_options.refresh_timer*1000000;
235
236 break;
237
238 case LEV_GRP_WAIT:
239
240 retval = rmcast_options.leave_group_wait_time;
241
242 break;
243
244 default:
245 fprintf(stderr,"generateTimerValue Warning: Unknow action=%d\n",action);
246 }
247
248 return ( retval );
249 }
250
251 /***************************************************************************************
252 *
253 * inline void getRemainingTime(EVENT_LIST **el, long double*remaining_time, char *msg)
254 *
255 * Gets the remaining time to execute a specific action in the event list.
256 *
257 * Arguments: el, the event list;
258 * remaining_time, the remaining time to execute the action;
259 * msg, an error message to be shown if remaining_time <= 0.
260 *
261 ***************************************************************************************/
262
getRemainingTime(EVENT_LIST ** el,long double * remaining_time,char * msg)263 inline void getRemainingTime(EVENT_LIST **el, long double *remaining_time, char *msg)
264 {
265 struct timeval current_time;
266
267 gettimeofday(¤t_time, 0);
268
269 (*remaining_time) = (*el)->timer_value - ( getTimerValue(&(current_time))- getTimerValue(&((*el)->last_update_time)) );
270
271 if (*remaining_time <= 0)
272 {
273 #ifdef DEBUG_EVENTS
274 fprintf(stderr,"Error: remaining time < 0\n");
275 fprintf(stderr,"\tRemaining time: %.0Lf \n",*remaining_time);
276 fprintf(stderr,"\tCurrent time : %.0Lf \n",getTimerValue(&(current_time)) );
277 fprintf(stderr,"\tLast update time : %.0Lf \n",getTimerValue(&((*el)->last_update_time)) );
278 if (msg!=NULL)
279 fprintf(stderr,"\tmsg : %s \n",msg );
280 #endif
281 (*remaining_time) = 0;
282 }
283 }
284
285
286 /***************************************************************************************
287 *
288 * void setTimerValue(struct itimerval *value, long int time)
289 *
290 * Converts the time from microseconds to struct itimerval.
291 *
292 * Arguments: value, where the converted value will be stored (a struct itimerval);
293 * time, the time in microseconds.
294 *
295 ***************************************************************************************/
296
setTimerValue(struct itimerval * value,long int time)297 void setTimerValue(struct itimerval *value, long int time)
298 {
299 (*value).it_interval.tv_sec = 0;
300 (*value).it_interval.tv_usec = 0;
301
302 (*value).it_value.tv_sec = time/1000000;
303 (*value).it_value.tv_usec = time%1000000;
304 }
305
306 /***************************************************************************************
307 *
308 * long double getTimerValue(struct timeval *value )
309 *
310 * Converts the time from struct timeval to microseconds.
311 *
312 * Arguments: value, the struct timeval source.
313 *
314 * Returns: the time, in microseconds.
315 *
316 ***************************************************************************************/
317
318
getTimerValue(struct timeval * value)319 long double getTimerValue(/* struct itimerval *value */ struct timeval *value )
320 {
321 return (((long double)(*value).tv_sec*1000000)+((long double)(*value).tv_usec));
322 }
323
324 /*---------------- Main functions to manipulate the event list ---------------------------------------*/
325
326 /***************************************************************************************
327 *
328 * void eventListInit(EVENT_LIST **el)
329 *
330 * Initialize the event list.
331 *
332 * Arguments: el, the event list;
333 *
334 ***************************************************************************************/
335
336
eventListInit(EVENT_LIST ** el)337 void eventListInit(EVENT_LIST **el)
338 {
339 #ifdef SOLARIS
340 pthread_mutex_init(&event_list_mutex,NULL);
341 #endif
342
343 *el = NULL;
344
345 /*
346 * FIXME under unicast mode, it doesn't make sense to have long timer values
347 * if (rmcast_options.transmission_mode == UNICAST)
348 * {
349 *
350 * int i;
351 *
352 * for (i=0; i<6; i++)
353 * DEFAULT_TIMER_VALUE [i] /= 10000;
354 * }
355 */
356
357 }
358
359 /***************************************************************************************
360 *
361 * int eventListAllocNode(EVENT_LIST **el_node, MEMBER_ID *member, char action,
362 * int sn, long timer_value)
363 *
364 * Allocates a event list node, and fills it with specified info.
365 *
366 * Arguments: el_node, the node which will be created;
367 * member, action, sn and timer_value hold info. to be stored in new node.
368 *
369 * Returns: 0 on error, 1 on success.
370 *
371 ***************************************************************************************/
372
eventListAllocNode(EVENT_LIST ** el_node,MEMBER_ID * member,char action,int sn,long timer_value)373 int eventListAllocNode(EVENT_LIST **el_node, MEMBER_ID *member,
374 char action, int sn, long timer_value)
375 {
376 if ((*el_node = (EVENT_LIST*)malloc(sizeof(EVENT_LIST)))==NULL)
377 {
378 return (0);
379 }
380 (*el_node)->member_id = member;
381 (*el_node)->action = action;
382 (*el_node)->timer_value = timer_value;
383 (*el_node)->sn = sn;
384 gettimeofday( &((*el_node)->last_update_time), 0) ;
385 (*el_node)->next = NULL;
386 return (1);
387 }
388
389
390 /***************************************************************************************
391 *
392 * int eventListInsert(EVENT_LIST **el, MEMBER_ID *member_id, char *sender_ip, char action, int sn)
393 *
394 * Inserts an event node in the event list.
395 *
396 * Arguments: el, the event list;
397 * member_id, the member identification;
398 * sender_ip, the IP address of the sender. Used for host related timer generation
399 * action, event action to be executed;
400 * sn, sequence number of the message to retransmited/requested.
401 *
402 ***************************************************************************************/
403
eventListInsert(EVENT_LIST ** el,MEMBER_ID * member_id,char * sender_ip,char action,int sn)404 int eventListInsert(EVENT_LIST **el, MEMBER_ID *member_id, char *sender_ip, char action, int sn)
405 {
406 EVENT_LIST *newNode, *auxNode, *antNode;
407 long int timer_value=0, accumulator=0;
408 long double remaining_time;
409 struct timeval current_time;
410 int retval = -2;
411
412 #ifdef SINGLE_NACK
413
414 int i = -1;
415 int nack_sn = -1;
416 int aux_bit_value = 0;
417 int event_flag = 0;
418 int control = 0;
419 int cont = 0;
420
421 CACHE *aux_cache_member;
422
423 cacheShow(cache);
424
425 aux_cache_member = cacheLookForMember(&cache, member_id);
426
427 #ifdef DSINGLE_NACK
428 fprintf(stderr,"DSINGLE_NACK eventListInsert: [aux_cache_member: %p &cache: %p member_id: %p]\n",aux_cache_member,&cache,member_id);
429 #endif
430 #ifdef DNAK_RCV
431 fprintf(stderr,"DNAK_RCV eventListInsert: entering. Action=%d sn=%d\n",action,sn);
432 #endif
433 if( aux_cache_member != NULL )
434 {
435 switch ( action )
436 {
437
438 case NAK_SND_WAIT: /* Trying to insert a single NACK */
439
440 for ( i = aux_cache_member->sm_info.member_status.last_seq_rcv+1;
441 i <= aux_cache_member->sm_info.member_status.window_size + aux_cache_member->sm_info.member_status.last_seq_rcv ;
442 i ++ )
443 {
444 /* Note: the first bit of the mask always refer to the packet with sn = last_seq_rcv + 1 */
445
446 #ifdef DWINDOW_MASK
447 fprintf(stderr,"DWINDOW_MASK eventListInsert: before window_mask_get_bit call. Event action=%d sn=%d i=%d\n",action,sn,i);
448 #endif
449 aux_bit_value = window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,i, &aux_cache_member->sm_info.member_status );
450
451 if ( ( aux_bit_value == NAK_SND_WAIT ) || ( aux_bit_value == RET_RCV_WAIT ) )
452 {
453 /* NAK_SND_WAIT or RET_RCV_WAIT already exists */
454 control = aux_bit_value;
455 break;
456 }
457
458 }
459
460 if ( control == RET_RCV_WAIT )
461 {
462 /* RET_RCV_WAIT event exists - just set the bit to RET_RCV_WAIT, the nack will be send in the next time */
463
464 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,sn, &aux_cache_member->sm_info.member_status, RET_RCV_WAIT );
465 }
466 else
467 {
468
469 if( cacheUpdateNakList(&cache, member_id, sn) > 0 )
470 {
471 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,sn, &aux_cache_member->sm_info.member_status, NAK_SND_WAIT );
472 }
473 else
474 {
475 /*
476 * We have sent the max number of NAK (see rmcast_options structure) we didn't recover the packet
477 * so we must exit because we cannot keep the reliability
478 */
479
480 fprintf(stderr,"********\n");
481 fprintf(stderr,"eventListInsert ERROR: Recovering packet failed. Max number (%d) of NAKs reached!\n",rmcast_options.max_nak);
482 fprintf(stderr,"\tHost IP:PID=%s:%d\tsn=%d\n",member_id->ip,member_id->pid,i);
483 fprintf(stderr,"********\n");
484
485 RM_leaveGroup(local_user_info.socket, local_user_info.member_id.ip);
486 }
487
488 if( control == 0 )
489 {
490 /* We have to insert a new event */
491 event_flag = NAK_SND_WAIT;
492 }
493 }
494
495 break;
496
497 case RET_RCV_EXPIRED: /* RET_RCV_WAIT event has expired: change bits 2 to 1 in the mask and create NAK_SND_WAIT event */
498
499 for ( i = aux_cache_member->sm_info.member_status.last_seq_rcv+1;
500 i <= aux_cache_member->sm_info.member_status.window_size +
501 aux_cache_member->sm_info.member_status.last_seq_rcv ;
502 i ++ )
503 {
504 /* Note: the first bit of the mask always refer to the packet with sn = last_seq_rcv + 1 */
505
506 #ifdef DWINDOW_MASK
507 fprintf(stderr,"DWINDOW_MASK eventListInsert: before window_mask_get_bit call. Event action=%d sn=%d i=%d\n",action,sn,i);
508 #endif
509 aux_bit_value = window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,i, &aux_cache_member->sm_info.member_status );
510
511 if ( aux_bit_value == RET_RCV_WAIT )
512 {
513 if( cacheUpdateNakList(&cache, member_id, i) > 0 )
514 {
515 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,i, &aux_cache_member->sm_info.member_status, NAK_SND_WAIT );
516 }
517 else
518 {
519 /*
520 * We have sent the max number of NAK (see rmcast_options structure) we didn't recover the packet
521 * so we must exit because we cannot keep the reliability
522 */
523
524 fprintf(stderr,"********\n");
525 fprintf(stderr,"eventListInsert ERROR: Recovering packet failed. Max number (%d) of NAKs reached!\n",rmcast_options.max_nak);
526 fprintf(stderr,"\tHost IP:PID=%s:%d\tsn=%d\n",member_id->ip,member_id->pid,i);
527 fprintf(stderr,"********\n");
528
529 RM_leaveGroup(local_user_info.socket, local_user_info.member_id.ip);
530
531 }
532 }
533 }
534
535 event_flag = NAK_SND_WAIT;
536
537 break;
538
539
540 case RET_RCV_WAIT: /* NAK_SND_WAIT has expired: change bits 1 to 2 in the mask and create RET_RCV_WAIT event */
541
542 for( i = aux_cache_member->sm_info.member_status.last_seq_rcv+1;
543 i <= aux_cache_member->sm_info.member_status.window_size +
544 aux_cache_member->sm_info.member_status.last_seq_rcv ;
545 i ++ )
546 {
547 /* Note: the first bit of the mask always refer to the packet with sn = last_seq_rcv + 1 */
548
549 #ifdef DWINDOW_MASK
550 fprintf(stderr,"DWINDOW_MASK eventListInsert: before window_mask_get_bit call. Event action=%d sn=%d i=%d\n",action,sn,i);
551 #endif
552 aux_bit_value = window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,i, &aux_cache_member->sm_info.member_status );
553
554 if( ( aux_bit_value == NAK_SND_WAIT ) || ( aux_bit_value == SUPPRESSED_NAK ) )
555 {
556 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask, i, &aux_cache_member->sm_info.member_status, RET_RCV_WAIT );
557 }
558 }
559
560 #ifdef DNAK_RCV
561 fprintf( stderr, "DNAK_RCV eventListInsert: Now we have to insert a RET_RCV_WAIT event!\n");
562 #endif
563 event_flag = RET_RCV_WAIT;
564
565 break;
566
567 case SUPPRESSED_NAK: /* Suppress NAK identified by sn */
568
569 for( i = aux_cache_member->sm_info.member_status.last_seq_rcv+1;
570 i <= aux_cache_member->sm_info.member_status.window_size +
571 aux_cache_member->sm_info.member_status.last_seq_rcv ;
572 i ++ )
573 {
574 /* Note: the first bit of the mask always refer to the packet with sn = last_seq_rcv + 1 */
575
576 #ifdef DWINDOW_MASK
577 fprintf(stderr,"DWINDOW_MASK eventListInsert: before window_mask_get_bit call. Event action=%d sn=%d i=%d\n",action,sn,i);
578 #endif
579 aux_bit_value = window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,i, &aux_cache_member->sm_info.member_status );
580
581 if( ( aux_bit_value == NAK_SND_WAIT ) )
582 {
583 /* NAK_SND_WAIT already exists, add 1 to cont */
584 control = aux_bit_value;
585 nack_sn = i;
586 cont++;
587
588 }
589 if( aux_bit_value == RET_RCV_WAIT )
590 {
591 /* RET_RCV_WAIT already exists */
592 control = aux_bit_value;
593 break;
594 }
595
596 }
597
598 if (control == RET_RCV_WAIT )
599 {
600 /* No need to set the bit to SUPPRESSED_NAK because no NAK_SND_WAIT event exists */
601 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask, sn, &aux_cache_member->sm_info.member_status, RET_RCV_WAIT );
602 }
603 else if( control == NAK_SND_WAIT )
604 {
605 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask, sn, &aux_cache_member->sm_info.member_status, SUPPRESSED_NAK );
606 #ifdef DSUPPRESSION
607 fprintf(suppressionfile,"nk %s %s %d %d\n",sender_ip,member_id->ip,member_id->pid,sn);
608 #endif
609
610 if( ( nack_sn == sn ) && ( cont == 1) )
611 {
612 /*
613 * We have changed the last NAK bit to SUPPRESSED_NAK.
614 * now we have to change each SUPPRESSED_NAK bit to RET_RCV_WAIT
615 * and insert a RET_RCV_WAIT event
616 */
617
618 #ifdef DSUPPRESSION
619 fprintf(suppressionfile,"NK %s %s %d %d\n",sender_ip,member_id->ip,member_id->pid,sn);
620 #endif
621
622 for( i = aux_cache_member->sm_info.member_status.last_seq_rcv+1;
623 i <= aux_cache_member->sm_info.member_status.window_size +
624 aux_cache_member->sm_info.member_status.last_seq_rcv ;
625 i ++ )
626 {
627 /* Note: the first bit of the mask always refer to the packet with sn = last_seq_rcv + 1 */
628
629 #ifdef DWINDOW_MASK
630 fprintf(stderr,"DWINDOW_MASK eventListInsert: before window_mask_get_bit call. Event action=%d sn=%d i=%d\n",action,sn,i);
631 #endif
632 aux_bit_value = window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,i, &aux_cache_member->sm_info.member_status );
633
634 if( ( aux_bit_value == NAK_SND_WAIT ) )
635 {
636 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask, i, &aux_cache_member->sm_info.member_status, RET_RCV_WAIT );
637 }
638 }
639
640 event_flag = RET_RCV_WAIT;
641 }
642 }
643 else
644 {
645 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask, sn, &aux_cache_member->sm_info.member_status, RET_RCV_WAIT );
646 event_flag = RET_RCV_WAIT;
647 }
648
649 break;
650
651 default:
652
653 event_flag = action;
654
655 }
656 }
657 else
658 {
659 event_flag = action;
660 }
661
662 if ( event_flag > 0 )
663 {
664 action = event_flag;
665
666 #endif /* Single NACK */
667
668 #ifdef DNAK_RCV
669 fprintf(stderr,"After switch (action) --> action == %d\n",action);
670 #endif
671 gettimeofday(¤t_time, 0);
672
673 auxNode = *el;
674
675
676 if (*el!=NULL)
677 {
678
679 getRemainingTime(el,&remaining_time,"(Called from eventListInsert)\n");
680
681 }
682 else
683 {
684 remaining_time = 0;
685 }
686 #ifdef DEBUG_EVENTS
687 fprintf(stderr,"DEBUG_EVENTS eventListInsert: current event list:\n");
688
689 eventListShow(*el);
690
691 fprintf(stderr,"DEBUG_EVENTS eventListInsert: node to be inserted:\n");
692 fprintf(stderr,"DEBUG_EVENTS eventListInsert: action: %d sn: %d\n", action, sn);
693 fprintf(stderr,"DEBUG_EVENTS eventListInsert: remaining time: %.0Lf\n", remaining_time);
694 fprintf(stderr,"DEBUG_EVENTS eventListInsert: current time %.0Lf\n\n", getTimerValue(¤t_time) );
695 #endif
696
697 if (*el!=NULL)
698 {
699
700 /* .. and set the timer value of the header of the list to the
701 remaining time to the first event occur */
702
703 (*el)->timer_value = remaining_time;
704 (*el)->last_update_time = current_time;
705
706 }
707
708 timer_value=generateTimerValue(action,rmcast_options.timer_distribution,sender_ip);
709
710 #ifdef DEBUG_EVENTS
711 fprintf(stderr,"DEBUG_EVENTS eventListInsert: time %ld\n\n", timer_value );
712 #endif
713
714 if ((eventListAllocNode(&newNode,member_id,action,sn,timer_value))==(int)NULL)
715
716 {
717 return (0);
718 }
719 if (auxNode == NULL) /* if the list is empty... */
720 {
721 (*el) = newNode;
722 }
723 else
724 {
725 if (newNode->timer_value < auxNode->timer_value)
726 {
727
728 /* if the node to be inserted is the first of the list...*/
729
730 (newNode)->next = auxNode;
731 (*el) = newNode;
732 (newNode)->next->timer_value -= newNode->timer_value;
733 }
734 else
735 {
736 antNode = auxNode;
737 accumulator = auxNode->timer_value;
738 while(accumulator < newNode->timer_value)
739 {
740 antNode = auxNode;
741 auxNode = auxNode->next;
742 if (auxNode==NULL)
743 break;
744 accumulator += auxNode->timer_value;
745 }
746 if (auxNode == NULL)
747 {
748 /* ... if the node to be inserted is the last of the list... */
749
750 antNode->next = newNode;
751 newNode->timer_value -= accumulator;
752 }
753 else
754 {
755 if (auxNode!=antNode)
756 {
757
758 /* ... if the node to be inserted is in the middle of the list */
759 newNode->next = auxNode;
760 antNode->next = newNode;
761 #ifdef DEBUG_EVENTS
762 fprintf(stderr,"DEBUG_EVENTS eventListInsert: accumulator: %ld newNode->timer_value: %ld antNode->timer_value: %ld auxNode->timer_value: %ld",
763 accumulator , newNode->timer_value , antNode->timer_value, auxNode->timer_value);
764 #endif
765 newNode->timer_value -= (accumulator - auxNode->timer_value);
766 auxNode->timer_value -= newNode->timer_value;
767 }
768 else
769 {
770
771 /* ... finally, if the node to be inserted has a time value
772 equal to the first of the list... */
773
774 newNode->next = antNode->next;
775 antNode->next = newNode;
776 newNode->timer_value = 0;
777 }
778 }
779 }
780 }
781
782 #ifdef DEBUG_SHOW
783 eventListShow(*el);
784 #endif
785
786 #ifdef DEBUG_EVENTS
787 fprintf(stderr,"DEBUG_EVENTS eventListInsert: sending SIGUSR1 signal\n");
788 #endif
789 retval = pthread_kill(signal_handler_thread, SIGUSR1);
790
791 if ( retval )
792 {
793 fprintf(stderr,"eventListInsert ERROR: pthread_kill error: %d \n",retval);
794
795 }
796 #ifdef DEBUG_EVENTS
797 else
798 {
799 fprintf(stderr,"DEBUG_EVENTS eventListInsert: signal sent\n");
800 }
801 #endif
802 #ifdef DNAK_RCV
803 fprintf(stderr, "eventListInsert: showing Event List before leaving\n");
804 eventListShow(*el);
805 #endif
806
807 #ifdef SINGLE_NACK
808 } /* if (control >0 ) */
809 #endif
810
811 return (1);
812 }
813
814 /***************************************************************************************
815 *
816 * void eventListShow(EVENT_LIST *el)
817 *
818 * Shows the event list.
819 *
820 * Arguments: el, the event list.
821 *
822 ***************************************************************************************/
823
eventListShow(EVENT_LIST * el)824 void eventListShow(EVENT_LIST *el)
825 {
826
827 #ifdef DEBUG_EVENTS
828
829 EVENT_LIST *aux;
830 int i=0;
831 long double remaining_time;
832
833 struct timeval current_time;
834
835 gettimeofday(¤t_time, 0);
836
837 aux = el;
838
839 fprintf(stderr,"\n----------EventList------------\n\n");
840
841 if (aux==NULL)
842 {
843 fprintf(stderr,"EventList is EMPTY\n");
844 fprintf(stderr,"\n------------------------------\n\n");
845 return;
846 }
847
848 fprintf(stderr,"aux: %p\n",aux);
849
850 for (;aux!=NULL;aux=aux->next)
851 {
852 fprintf(stderr," [%p](ip=%s pid=%d action=%2d sn=%2d timer=%8ld) -> ",
853 aux,
854 ((aux->member_id!=NULL)?aux->member_id->ip:"NULL"),
855 ((aux->member_id!=NULL)?aux->member_id->pid:-1),
856 aux->action,
857 aux->sn,
858 aux->timer_value);
859 if (i%2) fprintf(stderr,"\n");
860 i++;
861 }
862 fprintf(stderr," NULL\n");
863 fprintf(stderr," eventListShow: number of nodes in event list=%d\n",i);
864 if (el!=NULL)
865 {
866 getRemainingTime(&el,&remaining_time,"Called from eventListShow");
867 }
868
869 fprintf(stderr,"\n------------------------------\n\n");
870
871 #endif
872 }
873
874
875 /***************************************************************************************
876 *
877 * int eventListRemoveFirst(EVENT_LIST **el)
878 *
879 * Removes the first element of the event list.
880 *
881 * Arguments: el, the event list.
882 *
883 * Return: 1, on success;
884 * 0, on fail.
885 *
886 * Pay Attention!
887 *
888 * This function neither sends any signal, nor calls any time related function.
889 *
890 ***************************************************************************************/
891
eventListRemoveFirst(EVENT_LIST ** el)892 int eventListRemoveFirst(EVENT_LIST **el)
893 {
894 EVENT_LIST *ant;
895
896 /* #ifdef SINGLE_NACK
897
898 int i = 0;
899
900 CACHE *aux_cache_member = cacheLookForMember(&cache, (*el)->member_id);
901
902 if ( (*el != NULL) && (aux_cache_member != NULL ))
903 {
904 if ( (*el) -> action == NAK_SND_WAIT )
905 {
906
907 for ( i = 0; i < MAX_WINDOW_SIZE ; i ++ )
908 {
909 if ( window_mask_get_bit2( aux_cache_member->sm_info.member_status.window_mask,
910 i ) == NAK_SND_WAIT )
911 {
912 window_mask_set_bit2( aux_cache_member->sm_info.member_status.window_mask,
913 i , RET_RCV_WAIT );
914 }
915 }
916
917 } else if ( (*el) -> action == RET_RCV_WAIT ) {
918
919 for ( i = 0; i < MAX_WINDOW_SIZE ; i ++ )
920 {
921 if ( window_mask_get_bit2( aux_cache_member->sm_info.member_status.window_mask,
922 i ) == RET_RCV_WAIT )
923 {
924 window_mask_set_bit2( aux_cache_member->sm_info.member_status.window_mask,
925 i , NAK_SND_WAIT );
926 }
927 }
928
929 }
930
931 }
932
933 #endif */
934
935
936 if (*el==NULL)
937 {
938 return (0);
939 }
940 else
941 {
942
943 ant = *el;
944 (*el) = (*el)->next;
945
946 free(ant);
947 return (1);
948 }
949 }
950
951 /***************************************************************************************
952 *
953 * int eventListRemove(EVENT_LIST **el, MEMBER_ID *member_id, char action, int sn)
954 *
955 * Arguments: el, the event list;
956 * member_id, action and sn, identify the node to be removed;
957 *
958 * Return value: 1 on success;
959 * 0 otherwise.
960 *
961 ***************************************************************************************/
962
eventListRemove(EVENT_LIST ** el,MEMBER_ID * member_id,char action,int sn)963 int eventListRemove(EVENT_LIST **el, MEMBER_ID *member_id, char action, int sn)
964 {
965 EVENT_LIST *ant_node;
966
967 int retval = 0;
968 EVENT_LIST *node_to_be_removed;
969 long double remaining_time=0;
970
971 struct timeval current_time;
972
973 #ifdef SINGLE_NACK
974
975 int i = 0;
976
977 CACHE *aux_cache_member;
978
979 aux_cache_member = cacheLookForMember(&cache, member_id);
980
981 if(aux_cache_member!=NULL)
982 {
983 #ifdef DSINGLE_NACK
984 fprintf(stderr,"DSINGLE_NACK eventListRemove: Trying to remove event action=%d\n",action);
985 #endif
986 #ifdef DNAK_RCV
987 fprintf(stderr, "eventListRemove: Trying to remove event action=%d\n",action);
988 #endif
989
990 if ( action == NAK_SND_WAIT || action == RET_RCV_WAIT )
991 {
992 if( ( sn <= aux_cache_member->sm_info.member_status.last_seq_rcv )
993 || ( sn > aux_cache_member->sm_info.member_status.last_seq_rcv + aux_cache_member->sm_info.member_status.window_size ) )
994 {
995 #ifdef DNAK_RCV
996 fprintf( stderr,"eventListRemove: There is no such event because sn=%d is out of the window_mask action=%d\n",sn,action);
997 #endif
998 return 0;
999 }
1000
1001 #ifdef DSINGLE_NACK
1002 fprintf(stderr,"DSINGLE_NACK eventlistremove: member_status.last_seq_rcv=%d,member_status.window_ini=%d\n",aux_cache_member->sm_info.member_status.last_seq_rcv,aux_cache_member->sm_info.member_status.window_ini);
1003 #endif
1004 if ( window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,
1005
1006 sn, &(aux_cache_member->sm_info.member_status) ) == action )
1007 {
1008
1009 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,
1010
1011 sn, &(aux_cache_member->sm_info.member_status), 0 );
1012
1013 /* Remove node from member's NAK list */
1014 /* FIXME cacheDeleteNakNode( cache, member_id, sn); */
1015
1016 for ( i = aux_cache_member->sm_info.member_status.last_seq_rcv + 1;
1017 i <= aux_cache_member->sm_info.member_status.window_size + aux_cache_member->sm_info.member_status.last_seq_rcv ;
1018 i ++ )
1019 {
1020
1021 if ( window_mask_get_bit( aux_cache_member->sm_info.member_status.window_mask,
1022 i, &aux_cache_member->sm_info.member_status ) == action )
1023 {
1024 return 1;
1025 }
1026 }
1027 }
1028 else
1029 return 0;
1030 }
1031 }
1032
1033 #endif /*SINGLE NACK*/
1034
1035 node_to_be_removed= eventListFind(el,member_id,action,sn,&ant_node);
1036 #ifdef DSINGLE_NACK
1037 fprintf(stderr,"DSINGLE_NACK eventlistRemove: after looking for event, action=%d,sn=%d.\n",action,sn);
1038 #endif
1039 #ifdef DNAK_RCV
1040 fprintf(stderr, "eventlistRemove: after looking for event, action=%d,sn=%d.\n",action,sn);
1041 #endif
1042
1043 if (node_to_be_removed==NULL)
1044 {
1045 #ifdef DSINGLE_NACK
1046 fprintf(stderr,"DSINGLE_NACK eventlistRemove:event (action=%d sn=%d) not found.\n",action,sn);
1047 #endif
1048 #ifdef DNAK_RCV
1049 fprintf(stderr, "eventlistRemove:event (action=%d sn=%d) not found.\n",action,sn);
1050 #endif
1051
1052 return (0);
1053 }
1054 else
1055 {
1056 #ifdef DSINGLE_NACK
1057 fprintf(stderr,"DSINGLE_NACK eventlistRemove: event (action=%d sn=%d) found.\n",action,sn);
1058 #endif
1059 #ifdef DNAK_RCV
1060 fprintf(stderr, "eventlistRemove: event (action=%d sn=%d) found.\n",action,sn);
1061 #endif
1062
1063 if (ant_node == NULL)
1064 {
1065 /* the node to be removed is the first node of the list */
1066
1067 if (*el!=NULL)
1068 {
1069 getRemainingTime(el,&remaining_time,"(eventListRemove)\n");
1070 }
1071 else
1072 {
1073 remaining_time = 0;
1074 }
1075 *el = (*el)->next;
1076
1077 if (*el!=NULL)
1078 {
1079 (*el)->timer_value += remaining_time;
1080
1081 gettimeofday(¤t_time, 0);
1082
1083 (*el)->last_update_time = current_time;
1084
1085 if ((*el)->timer_value == 0)
1086 {
1087
1088 #ifdef DEBUG_EVENTS
1089 fprintf(stderr,"DEBUG_EVENTS eventListRemove warning: the alarm was restarted.\n");
1090 #endif
1091 eventListShow(*el);
1092 }
1093 }
1094
1095 retval = pthread_kill(signal_handler_thread, SIGUSR1);
1096
1097 if ( retval )
1098 {
1099 fprintf(stderr,"evenListRemove ERROR: trying to kill signal handler thread retval=%d \n",retval);
1100
1101 }
1102
1103
1104 }
1105 else
1106 {
1107 if (node_to_be_removed->next!=NULL)
1108 {
1109 node_to_be_removed->next->timer_value += node_to_be_removed->timer_value;
1110 ant_node->next = node_to_be_removed -> next;
1111 }
1112 else
1113 {
1114 /* the node to be removed is the last node of the list */
1115 ant_node->next = NULL;
1116 }
1117 }
1118 }
1119 free(node_to_be_removed);
1120 node_to_be_removed = NULL;
1121
1122 return (1);
1123 }
1124
eventListIsEqual(EVENT_LIST * node1,EVENT_LIST * node2)1125 int eventListIsEqual(EVENT_LIST *node1, EVENT_LIST *node2)
1126 {
1127 if ((node1->member_id == NULL) || (node2->member_id == NULL))
1128 {
1129
1130 #ifdef SINGLE_NACK
1131
1132 #ifdef DSINGLE_NACK
1133 fprintf(stderr,"DSINGLE_NACK eventListIsEqual: node null, node1=%p node2=%p\n",node1->member_id,node2->member_id);
1134 #endif
1135
1136 if ( node1->action == NAK_SND_WAIT || node1->action == RET_RCV_WAIT )
1137 {
1138 if ( node1->action == node2->action )
1139 {
1140
1141 #ifdef DSINGLE_NACK
1142 fprintf(stderr,"DSINGLE_NACK eventListIsEqual: (first return) event is equal action=%d\n",node1->action);
1143 #endif
1144 return 1;
1145 }
1146 else
1147 {
1148 #ifdef DSINGLE_NACK
1149 fprintf(stderr,"DSINGLE_NACK eventListIsEqual: (first return) event is not equal action=%d\n",node1->action);
1150 #endif
1151 return 0;
1152 }
1153 }
1154 #endif
1155 if ( (node1->action == node2->action) &&
1156 (node1->sn == node2->sn))
1157
1158 return 1;
1159
1160 else
1161
1162 return 0;
1163 }
1164
1165 #ifdef SINGLE_NACK
1166
1167 if ( node1->action == NAK_SND_WAIT || node1->action == RET_RCV_WAIT )
1168 {
1169
1170 if ((!strcmp(node1->member_id->ip, node2->member_id->ip))
1171 && (node1->member_id->pid == node2->member_id->pid)
1172 && (node1->action == node2->action)
1173 )
1174 {
1175 #ifdef DSINGLE_NACK
1176 fprintf(stderr,"DSINGLE_NACK eventListIsEqual: (second return) event is equal action=%d\n",node1->action);
1177 #endif
1178 return (1);
1179
1180 }
1181 else
1182 {
1183 #ifdef DSINGLE_NACK
1184 fprintf(stderr,"DSINGLE_NACK eventListIsEqual: (second return) event is equal action=%d\n",node1->action);
1185 #endif
1186 return (0);
1187
1188 }
1189 }
1190
1191 #endif
1192
1193 if ((!strcmp(node1->member_id->ip, node2->member_id->ip))
1194 && (node1->member_id->pid == node2->member_id->pid)
1195 && (node1->action == node2->action)
1196 && (node1->sn == node2->sn))
1197
1198 return (1);
1199 else
1200 return (0);
1201 }
1202
1203 /***************************************************************************************
1204 *
1205 * EVENT_LIST * eventListFind(EVENT_LIST **el1,MEMBER_ID *member_id, char action,
1206 * int sn, EVENT_LIST **antNode)
1207 *
1208 * Search for a node in the event list.
1209 *
1210 * Arguments: el1, the event list;
1211 * member_id, action and sn, identify the node to be found;
1212 * antNode, return a pointer to the node that points to the one,
1213 * or NULL if the node removed is the last one.
1214 *
1215 * Return value: a pointer to the node found, or NULL if the node wasn't found.
1216 *
1217 ***************************************************************************************/
1218
1219
eventListFind(EVENT_LIST ** el1,MEMBER_ID * member_id,char action,int sn,EVENT_LIST ** antNode)1220 EVENT_LIST * eventListFind(EVENT_LIST **el1,MEMBER_ID *member_id, char action, int sn, EVENT_LIST **antNode)
1221 {
1222 EVENT_LIST *el;
1223
1224 EVENT_LIST *event_node, *auxPointer;
1225
1226 *antNode = NULL;
1227
1228 el = *el1;
1229
1230 auxPointer = el;
1231
1232 #ifdef SINGLE_NACK
1233
1234 {
1235
1236 CACHE *aux_cache_member = NULL;
1237
1238 aux_cache_member = cacheLookForMember(&cache, member_id);
1239
1240 #ifdef DNAK_RCV
1241 fprintf(stderr,"eventListFind: looking for ip:pid:sn:action = %s:%d:%d:%d\n",
1242 member_id->ip,
1243 member_id->pid,
1244 sn,
1245 action);
1246 eventListShow(el);
1247 #endif
1248
1249 if ( aux_cache_member != NULL )
1250 {
1251 if ( sn > ( aux_cache_member->sm_info.member_status.last_seq_rcv + aux_cache_member->sm_info.member_status.window_size ) )
1252 {
1253
1254 #ifdef DSINGLE_NACK
1255 fprintf(stderr,"DSINGLE_NACK eventListFind: sn=%d is out of window(last_seq_rcv+1,last_seq_rcv+MAX_WINDOW_SIZE)=(%d,%d)!\n",
1256 sn,
1257 aux_cache_member->sm_info.member_status.last_seq_rcv,
1258 aux_cache_member->sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE);
1259 #endif
1260 #ifdef DNAK_RCV
1261 fprintf(stderr,"eventListFind: sn=%d is out of window(last_seq_rcv+1,last_seq_rcv+MAX_WINDOW_SIZE)=(%d,%d)!\n",
1262 sn,
1263 aux_cache_member->sm_info.member_status.last_seq_rcv,
1264 aux_cache_member->sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE);
1265
1266 #endif
1267
1268 return NULL;
1269 }
1270 }
1271 else
1272 {
1273
1274 #ifdef DSINGLE_NACK
1275 fprintf(stderr,"DSINGLE_NACK eventListFind: no such member found in the cache!\n");
1276 #endif
1277 #ifdef DNAK_RCV
1278 fprintf(stderr,"eventListFind: member found in the cache: %s:%d\n",member_id->ip,member_id->pid);
1279 #endif
1280 return NULL;
1281 }
1282 }
1283
1284 #endif
1285
1286 #ifdef DEBUG_EVENTS
1287
1288 fprintf(stderr,"DEBUG_EVENTS eventListFind: Inside listfind\n ip: %s\n pid: %d\n action:%d\n sn: %d\n######\n",
1289 member_id->ip,
1290 member_id->pid,
1291 action,
1292 sn);
1293
1294 #ifdef DEBUG_SHOW
1295 eventListShow(el);
1296 #endif
1297
1298 #endif
1299
1300 if ((eventListAllocNode(&event_node, member_id, action, sn, 0))==(int)NULL)
1301 {
1302 perror("eventListFind: Alloc list node at eventListFind");
1303 RM_leaveGroup(local_user_info.socket, local_user_info.member_id.ip);
1304 }
1305
1306 while (auxPointer!=NULL)
1307 {
1308
1309 if (eventListIsEqual(event_node, auxPointer))
1310 {
1311 /* The event was found! */
1312
1313 break;
1314 }
1315 *antNode = auxPointer;
1316 auxPointer = auxPointer->next;
1317 }
1318
1319 free(event_node);
1320
1321 return (auxPointer);
1322 }
1323
1324 #endif
1325
1326