1 /*
2  * Copyright (C) 2002-2009, Edmundo Albuquerque de Souza e Silva.
3  *
4  * This file may be distributed under the terms of the Q Public License
5  * as defined by Trolltech AS of Norway and appearing in the file
6  * LICENSE.QPL included in the packaging of this file.
7  *
8  * THIS FILE IS PROVIDED AS IS WITH NO WARRANTY OF ANY KIND, INCLUDING
9  * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
10  * PURPOSE.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
11  * INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
12  * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
13  * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
14  * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15  *
16  */
17 
18 /***************************************************************************
19                              rminternals.c
20                              -------------------
21     begin                : May 2001
22     Authors              : Jorge Allyson Azevedo
23                            Milena Scanferla
24                            Daniel Sadoc
25     email                : {allyson,milena,sadoc}@land.ufrj.br
26  ***************************************************************************/
27 
28 #ifndef RMINTERNALS_C
29 #define RMINTERNALS_C
30 
31 #include "rminternals.h"
32 #include "rmcast.h"
33 
34 #include <pthread.h>
35 #include <string.h>
36 #include <signal.h>
37 #include <errno.h>
38 
39 #include <sys/time.h>
40 #include <unistd.h>
41 #include <stdio.h>
42 #include <time.h>
43 
44 #include "rmwinmask.h"
45 
46 extern pthread_t rcv_thread, signal_handler_thread;
47 extern char *EVENT_ACTION_NAME[6];
48 extern char *MESSAGE_TYPE_NAME[NUMBER_OF_MESSAGE_TYPES];
49 FILE        *logfile=NULL;
50 
51 #ifdef DSUPPRESSION
52     extern FILE *suppressionfile;
53 #endif
54 
55 int STOP_TRANSMISSION=0;
56 
57 EVENT_LIST *event_list=NULL;
58 
59 #ifdef SINGLE_NACK
60 
61     EVENT_LIST *nak_event=NULL;
62     EVENT_LIST *wait_for_retransm_event=NULL;
63 
64 #endif
65 
66 CACHE      *cache=NULL;
67 USER_INFO  local_user_info;
68 sigset_t   alrmset;
69 
70 GLOBAL_OPTIONS rmcast_options =  {
71     /*   int        *pipe; */
72     0 ,
73     /*    char       tcp_ip[IP_STRING_SIZE]; */
74     { 0 },
75     /*    int        tcp_port; */
76     10013,
77     /*    void      ( *shut_down_routine )( void ) = NULL; */
78     0,
79     /*    int        cur_state_server_is_up = 0; */
80     0,
81     /*    char       dest_ip[IP_STRING_SIZE];  */
82     { "225.1.2.3" } ,
83     /*    int        dest_port; */
84     5151,
85     /* int        ttl; */
86     1,
87     /* unsigned int        microsleep; */
88     10,
89     /* int     hosts_identified; default 0*/
90     0,
91     /* int     timer_distribution; */
92     UNIFORM,
93     /* int     max_nak; default 100 */
94     100,
95     /* int     max_member_cache_size; default 4000 */
96     4000,
97     /* int     version; */
98     1,
99     /* int        transmission_mode; */
100     MULTICAST,
101     /* char       log_file[255]; */
102     "NULL",
103     /* int        new_member_support; */
104     0,
105     /* int        statistics; */
106     0,
107     /* int        refresh_timer; */
108     10,
109     /* float      loss_prob; */
110     0,
111     /* int        leave_group_wait_time; */
112     500000,
113     /* int        rcv_buffer_size;         */
114     10000,
115     /* HOSTS_DELAYS hosts_delays */
116     {
117         { "DEFAULT", /* host IP */
118           50000 /* one-way delay to host( microseconds ) */
119         }
120     },
121     /* parameters to obtain the timer interval */
122     2,
123     2,
124     5,
125     2,
126     2,
127     2
128 };
129 
130 extern pthread_mutex_t  change_local_user_sn;
131 extern pthread_mutex_t  event_list_mutex;
132 extern pthread_mutex_t  cache_mutex;
133 
134 /*********************** Main routines ************************************************************/
135 
lock_eventlist_and_cache()136 void lock_eventlist_and_cache( )
137 {
138     pthread_mutex_lock( &event_list_mutex );
139     pthread_mutex_lock( &cache_mutex );
140 }
141 
unlock_eventlist_and_cache()142 void unlock_eventlist_and_cache( )
143 {
144     pthread_mutex_unlock( &cache_mutex );
145     pthread_mutex_unlock( &event_list_mutex );
146 }
147 
148 
149 /*****************************************************************************************************
150  *
151  * int    rmcastCreateSocket ( int port )
152  *
153  * Creates a multi-cast socket, using the specified port.
154  *
155  * Returns the socket handler.
156  *
157  ******************************************************************************************************/
rmcastCreateSocket(int port)158 int rmcastCreateSocket( int port )
159 {
160 
161     int    aux = 1  ;
162     struct sockaddr_in saddr;
163 
164     local_user_info.socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
165 
166     bzero(( char* ) &saddr, sizeof( saddr ) );
167 
168     saddr.sin_family = AF_INET;
169     saddr.sin_port = htons( port );
170     saddr.sin_addr.s_addr = htonl( INADDR_ANY );
171 
172     if( setsockopt( local_user_info.socket, SOL_SOCKET, SO_REUSEADDR, ( char* )&aux, sizeof( aux ) ) == -1 )
173         perror( "reuse addr error" );
174 
175     if( bind( local_user_info.socket, ( struct sockaddr * )&saddr, sizeof( saddr ) ) == -1 )
176     {
177     perror( "binding error" );
178     close( local_user_info.socket );
179     local_user_info.socket = -1;
180     }
181 
182     if( rmcast_options.transmission_mode == MULTICAST )
183     {
184         rmcastSetTTLValue( local_user_info.socket, rmcast_options.ttl );
185         rmcastSetLoopBack( local_user_info.socket, 1 );
186     }
187 
188     return( local_user_info.socket );
189 }
190 
rmcastSetTTLValue(int soc,int ttl)191 void rmcastSetTTLValue( int soc, int ttl )
192 {
193 #ifndef SOLARIS
194     if( setsockopt( soc, IPPROTO_IP, IP_MULTICAST_TTL, ( char* )&ttl, sizeof( ttl ) ) ==-1 )
195     perror( "ttl" );
196 #endif
197 }
198 
rmcastSetLoopBack(int soc,int loop)199 void rmcastSetLoopBack( int soc, int loop )
200 {
201 #ifndef SOLARIS
202     if( setsockopt( soc, IPPROTO_IP, IP_MULTICAST_LOOP, ( char* )&loop, sizeof( loop ) ) ==-1 )
203         perror( "loop" );
204 #endif
205 }
206 
207 /*****************************************************************************************************
208  *
209  * void rmcastJoinGroup ( int soc, char *group_addr )
210  *
211  * Joins the user using the socket soc to the multicast group.
212  *
213  * Arguments:   the socket, in soc;
214  *              the multicast group ip, in group_addr
215  *
216  ******************************************************************************************************/
rmcastJoinGroup(int soc,char * group_addr)217 void rmcastJoinGroup ( int soc, char *group_addr )
218 {
219     struct sockaddr_in group;
220     struct ip_mreq mreq;
221 
222     if( rmcast_options.transmission_mode == MULTICAST )
223     {
224         fprintf( stdout,"rmcastJoinGroup: Joining group %s\n", group_addr );
225 
226         if(( group.sin_addr.s_addr = inet_addr( group_addr ) ) == ( unsigned )-1 )
227             perror( "rmcastJoingGroup: inet_addr" );
228 
229         mreq.imr_multiaddr      = group.sin_addr;
230         mreq.imr_interface.s_addr = INADDR_ANY;
231 
232         if( setsockopt( soc, IPPROTO_IP, IP_ADD_MEMBERSHIP, ( char * )&mreq, sizeof( mreq ) ) == -1 )
233             perror( "rmcastJoingGroup: ADD_MEMBERSHIP" );
234     }
235 }
236 
237 /*****************************************************************************************************
238  *
239  * void rmcastLeaveGroup ( int soc, char *group_addr )
240  *
241  * Used when the user using the socket soc wants to leave the multicast group.
242  *
243  * Arguments:   the socket, in soc;
244  *              the multicast group ip, in group_addr
245  * Returns: 1, on success;
246  *          0, on error.
247  *
248  ******************************************************************************************************/
rmcastLeaveGroup(int soc,char * group_addr)249 int rmcastLeaveGroup ( int soc, char *group_addr )
250 {
251     BYTE *buffer;
252     int buffsize = 0;
253 
254     PACKET_INFO pckt_info;
255 
256 #ifdef REFRESH
257 /* Mounting and sending a Refresh packet before leaving the group but only
258 if we have already sent one data packet  */
259     if( local_user_info.sn > 0 ){
260 
261         pckt_info.type = REFRESH_PACKET_TYPE;
262         pckt_info.flags = 0;
263         pckt_info.sender_id = local_user_info.member_id;
264         pckt_info.packet_size = sizeof( REFRESH_PACKET );
265 
266         pthread_mutex_lock( &change_local_user_sn );
267         pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent = local_user_info.sn - 1;
268         pthread_mutex_unlock( &change_local_user_sn );
269 
270     #ifdef DEBUG_EVENTS
271         fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Mounting refresh packet.\n" );
272     #endif
273         msgPcktMountMessage( &pckt_info, ( BYTE** )&buffer, &buffsize );
274         if ( rmcastSendPacket( soc, buffer, buffsize ) == 0 )
275             return 0;
276     }
277 #endif /* REFRESH */
278 
279 /* Mounting and sending a Leave group packet */
280     pckt_info.type = LEAVE_GROUP_PACKET_TYPE;
281     pckt_info.flags = 0;
282     pckt_info.sender_id = local_user_info.member_id;
283     pckt_info.packet_size = 0;
284 
285 #ifdef DEBUG_NET_MSGS
286     fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: mounting message\n" );
287 #endif
288 
289     msgPcktMountMessage( &pckt_info, ( BYTE** )&buffer, &buffsize );
290 
291 #ifdef DEBUG_NET_MSGS
292     fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: mounted 'leave group' message\n" );
293 #endif
294 
295     if ( rmcastSendPacket( soc, buffer, buffsize ) == 0 )
296         return 0;
297 
298 #ifdef DEBUG_NET_MSGS
299     fprintf( stderr,  "DEBUG_NET_MSGS rmcastLeaveGroup: sent 'leave group' message\n" );
300 #endif
301 
302 #ifdef DEBUG_NET_MSGS
303     fprintf( stderr,  "DEBUG_NET_MSGS rmcastLeaveGroup: Disconnecting... %s\n", group_addr );
304 #endif
305 
306     lock_eventlist_and_cache( );
307     eventListInsert( &event_list,NULL, NULL, LEV_GRP_WAIT, -1 );
308     unlock_eventlist_and_cache( );
309 
310 #ifdef DEBUG_NET_MSGS
311     fprintf( stderr,  "DEBUG_NET_MSGS rmcastLeaveGroup: returning\n" );
312 #endif
313 
314     return 1;
315 }
316 
317 /*****************************************************************************************************
318  *
319  * void showLogMsg( char type, PACKET_INFO *pckt_info )
320  *
321  * Prints a message from type 'type', related to the packet 'pckt_info', in the log file 'logfile'.
322  *
323  * Arguments:   the message type and the packet information.
324  *
325  * Side effects:  changes the log file.
326  *
327  ******************************************************************************************************/
showLogMsg(char type,PACKET_INFO * pckt_info)328 void showLogMsg( char type, PACKET_INFO *pckt_info )
329 {
330     struct timeval current_time;
331     char buffer[200];
332     struct tm time;
333 #ifdef DSHOW_COMPLETE_MASK_IN_LOG
334     char auxbuffer[100];
335     int n = 0;
336 #endif
337 
338     memset( buffer,0,sizeof( buffer ) );
339 
340     if( logfile!= NULL )
341     {
342         gettimeofday( &current_time, 0 );
343         time = *gmtime(( const time_t* )&current_time );
344         current_time. tv_sec =    (( time.tm_hour * 60 + time.tm_min ) * 60 ) + time.tm_sec;
345 
346         if( pckt_info !=NULL )
347         {
348             switch( pckt_info->type )
349             {
350 /*
351 ---------------------------------------------------------------------------------------------------------------------------
352 Time Snd/Rcv/Loss type sender_ip       sender_pid requested_ip    requested_pid  sn  [{base sn} {win size} {hmask} {lmask}]
353 ---------------------------------------------------------------------------------------------------------------------------
354 01234567890   x   yz   abc.def.ghi.jkl 1234567890 abc.def.ghi.jkl 1234567890   123456   123456   1234       1234    1234
355 */
356                 case DATA_PACKET_TYPE:
357 
358                      sprintf( buffer,"%15s %10d                              %6d",
359                          pckt_info->sender_id.ip,pckt_info->sender_id.pid, pckt_info->packet_data.data_packet.sn );
360 
361                      break;
362 
363                 case NAK_PACKET_TYPE:
364 #ifndef SINGLE_NACK
365                      sprintf( buffer,"%15s %10d %15s %10d   %6d",
366                              pckt_info->sender_id.ip,
367                              pckt_info->sender_id.pid,
368                              pckt_info->packet_data.nak_packet.requested_member_id.ip,
369                              pckt_info->packet_data.nak_packet.requested_member_id.pid,
370                              pckt_info->packet_data.nak_packet.sn
371                              );
372 #else
373                      sprintf( buffer,"%15s %10d %15s %10d            %6d   %4d       %4u    %4u   ",
374                              pckt_info->sender_id.ip,
375                              pckt_info->sender_id.pid,
376                              pckt_info->packet_data.nak_packet.requested_member_id.ip,
377                              pckt_info->packet_data.nak_packet.requested_member_id.pid,
378                              pckt_info->packet_data.nak_packet.base_sn,
379                              pckt_info->packet_data.nak_packet.window_size,
380                              pckt_info->packet_data.nak_packet.hmask,
381                              pckt_info->packet_data.nak_packet.lmask );
382 #ifdef DSHOW_COMPLETE_MASK_IN_LOG
383                             for( n= ( pckt_info->packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
384                             {
385                                 sprintf( auxbuffer,"%d", (( unsigned int )pckt_info->packet_data.nak_packet.hmask >> n ) & 1 );
386                                 strcat( buffer,auxbuffer );
387                             }
388 
389                             sprintf( auxbuffer,"-" );
390                             strcat( buffer,auxbuffer );
391 
392                             for( n= ( pckt_info->packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
393                             {
394                                 sprintf( auxbuffer,"%d", (( unsigned int )pckt_info->packet_data.nak_packet.lmask >> n ) & 1 );
395                                 strcat( buffer,auxbuffer );
396                             }
397 #endif /*DSHOW_COMPLETE_MASK_IN_LOG */
398 
399 #endif
400                      break;
401 
402                 case RETRANSM_PACKET_TYPE:
403                      sprintf( buffer,"%15s %10d %15s %10d   %6d",
404                              pckt_info->sender_id.ip,
405                              pckt_info->sender_id.pid,
406                              pckt_info->packet_data.retransm_packet.original_sender_id.ip,
407                              pckt_info->packet_data.retransm_packet.original_sender_id.pid,
408                              pckt_info->packet_data.retransm_packet.data_packet.sn );
409                      break;
410 
411 #ifdef REFRESH
412                 case REFRESH_PACKET_TYPE:
413                      sprintf( buffer,"%15s %10d                              %6d",
414                              pckt_info->sender_id.ip,pckt_info->sender_id.pid,
415                              pckt_info->packet_data.refresh_packet.sn_of_last_msg_sent );
416                      break;
417 #endif
418                 case JOIN_REQUEST_PACKET_TYPE:
419                      sprintf( buffer,"%15s %10d",pckt_info->sender_id.ip,pckt_info->sender_id.pid );
420                      break;
421 
422                 case JOIN_ACCEPT_PACKET_TYPE:
423                      sprintf( buffer,"%15s %10d",pckt_info->sender_id.ip,pckt_info->sender_id.pid );
424                      break;
425 
426                 case LEAVE_GROUP_PACKET_TYPE:
427                      sprintf( buffer,"%15s %10d",pckt_info->sender_id.ip,pckt_info->sender_id.pid );
428                      break;
429 
430                 default:
431                      fprintf( stderr, "Unknown packet type: %c\n",pckt_info->type );
432                      RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
433             }
434 
435             fprintf( logfile,"%.0Lf   %c   %s   %s\n",
436                (((( long double )current_time.tv_sec )*1000000+( long double )current_time.tv_usec ) ) ,
437                 type,
438                 MESSAGE_TYPE_NAME[( int )pckt_info->type],
439                 buffer );
440         }
441         else
442         {
443            fprintf( logfile,"%.0Lf     %c\n",
444                    (((( long double )current_time.tv_sec )*1000000+( long double )current_time.tv_usec ) ) ,
445                     type );
446         }
447     }
448 }
449 
450 /*****************************************************************************************************
451  *
452  * void* rmcastReceivePackets( void *user_soc )
453  *
454  * Receives packets from socket soc.
455  *
456  * Arguments:   the socket number, in soc;
457  *
458  * Side effects:  may change the state of the event list or cache.
459  *
460  ******************************************************************************************************/
rmcastReceivePackets(void * user_soc)461 void *rmcastReceivePackets( void *user_soc )
462 {
463     int    soc;
464     struct sockaddr_in sender;
465     static int first;
466     static int first_pckts_lost;
467     socklen_t len;
468     int sn;
469     int nbytes; /* number of bytes received */
470     char message[rmcast_options.rcv_buffer_size];
471     PACKET_INFO pckt_info;
472     MEMBER_ID aux_member_id;
473 #ifdef DPROCESSING_TIME
474     struct timeval time_before,time_after;
475     struct tm time;
476     FILE *proc_log;
477     char tipo;
478 #endif
479 
480     soc   = ( int )user_soc;
481     sn    = 0;
482     first = 1;
483     first_pckts_lost = 10;
484 
485 #ifndef SOLARIS
486     sigset_t alrmset;
487     sigfillset( &alrmset );            /* fills the alarm set with all the signals */
488 
489     /* blocks all the signals, because they won�t be handled by this thread */
490     pthread_sigmask( SIG_BLOCK, &alrmset, NULL );
491 #endif
492 #ifdef DPROCESSING_TIME
493     proc_log=fopen( "proc_times.log","w" );
494 #endif
495     do{
496         bzero( message, rmcast_options.rcv_buffer_size );
497 
498     #ifdef DEBUG_NET_MSGS
499         fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: Waiting to receive a message...\n" );
500     #endif
501         len    = sizeof( struct sockaddr_in );
502         nbytes = ( int )recvfrom( soc, message, rmcast_options.rcv_buffer_size, 0, ( struct sockaddr * ) &sender, &len );
503 
504     #ifdef DEBUG_NET_MSGS
505         fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: A message was received from the network!( message size=%d )\n",nbytes );
506     #endif
507 
508         if( nbytes <= 0 )
509         {
510             fprintf( stderr, "rmcastReceivePackets: Error while receiving a message from the net - nbytes=%d \n",nbytes );
511         }
512         else /* nbytes > 0 */
513         {
514 #ifdef DEBUG_PCKT
515             {
516                 int cont;
517                 fprintf( stderr, "DEBUG_PCKT rmcastReceivePackets: Received message:\n" );
518                 for( cont = 0; cont < nbytes; cont++ )
519                 {
520                     fprintf( stderr, "%d", ( ( signed char * )message )[cont] );
521                 }
522                 fprintf( stderr, "\n" );
523             }
524 #endif
525             msgPcktUnmountMessage( &pckt_info, ( BYTE* )message );
526 
527 #ifdef DPROCESSING_TIME
528         gettimeofday( &time_before, 0 );
529         time = *gmtime(( const time_t* )&time_before );
530         time_before.tv_sec = (( time.tm_hour * 60 + time.tm_min ) * 60 ) + time.tm_sec;
531         tipo = pckt_info.type;
532 
533 #endif
534 
535             if( memberIsEqual( &( pckt_info.sender_id ), &( local_user_info.member_id ) ) )
536             {
537                 /* We have received our own packet, so we just log and go to the next packet */
538                 showLogMsg( RECEIVE,&pckt_info );
539                 continue;
540             }
541 
542 
543 #ifdef DEBUG_NET_MSGS
544             fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: message type=%s\n",MESSAGE_TYPE_NAME[( int )( pckt_info.type )] );
545 #endif
546 
547             switch( pckt_info.type )
548             {
549                 case DATA_PACKET_TYPE:
550                      sn = pckt_info.packet_data.data_packet.sn; break;
551                 case NAK_PACKET_TYPE:
552                      sn = pckt_info.packet_data.nak_packet.sn;
553                      #ifdef DSINGLE_NACK
554 
555                         fprintf( stderr, "NAK received:\n\tFrom:%s:%d \n\tRequesting: %s:%d:%d \n\twin_size:%d \tbase_sn:%d \thmask:%u \tlmask:%u \n",
556                                          pckt_info.sender_id.ip,
557                                          pckt_info.sender_id.pid,
558                                          pckt_info.packet_data.nak_packet.requested_member_id.ip,
559                                          pckt_info.packet_data.nak_packet.requested_member_id.pid,
560                                          pckt_info.packet_data.nak_packet.sn,
561                                          pckt_info.packet_data.nak_packet.window_size,
562                                          pckt_info.packet_data.nak_packet.base_sn,
563                                         ( unsigned int )pckt_info.packet_data.nak_packet.hmask,
564                                         ( unsigned int )pckt_info.packet_data.nak_packet.lmask );
565                         {
566 
567                             int n;
568 
569                             fprintf( stderr, "\tHMASK:" );
570                             for( n = ( pckt_info.packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
571                             {
572                                 fprintf( stderr, "%d", (( unsigned int )pckt_info.packet_data.nak_packet.hmask >> n ) & 1 );
573                             }
574 
575                             fprintf( stderr, "\n\tLMASK:" );
576 
577                             for( n = ( pckt_info.packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
578                             {
579                                 fprintf( stderr, "%d", (( unsigned int )pckt_info.packet_data.nak_packet.lmask >> n ) & 1 );
580                             }
581 
582                             fprintf( stderr, "\n" );
583                         }
584 
585 
586                      #endif
587                      break;
588                 case RETRANSM_PACKET_TYPE:
589                      sn = pckt_info.packet_data.retransm_packet.data_packet.sn; break;
590                 case REFRESH_PACKET_TYPE:
591                      sn = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent; break;
592                 case JOIN_REQUEST_PACKET_TYPE:
593                      sn = -1; break;
594                 case JOIN_ACCEPT_PACKET_TYPE:
595                      sn = -1; break;
596                 case LEAVE_GROUP_PACKET_TYPE:
597                      sn = -1; break;
598                 default:
599                      fprintf( stderr, "rmcastReceivePackets: Unknown packet type.\n" );
600                      RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
601             }
602 #ifdef DEBUG_NET_MSGS
603             fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: sn=%d\n",sn );
604 #endif
605 
606 #ifdef DEBUG_SHOW
607             msgPcktShowMessage(( BYTE * )message );
608 #endif
609 
610 /* ------------------------ Loss Simulation begin----------------------------------------- */
611             if( rmcast_options.loss_prob > 0 )
612             {
613                 /* Uncomment the following line if don't want naks and retransmission to be lost
614 
615                 if( pckt_info.type != NAK_PACKET_TYPE && pckt_info.type != RETRANSM_PACKET_TYPE ) */
616                 {
617                     if( generateSample( UNIFORM ) < rmcast_options.loss_prob ||
618                         (( pckt_info.type == DATA_PACKET_TYPE ) &&( sn == 0 ) &&( first == 1 ) ) ||
619                         ( first_pckts_lost > 0 )
620                       )
621                         /*
622                          *  We are always losing the first 'first_pckts_lost'( e.g., 10 ) received packets,
623                          *  and the first received data packet -
624                          *  that is, the first burst of received packets.
625                          *  After that, packets are lost according to the rmcast_options.loss_prob.
626                          */
627                     {
628     #ifdef DEBUG_NET_MSGS
629                         fprintf( stderr, "\nDEBUG_NET_MSGS rmcastReceivePackets: PACKET LOST sn=%d\n\n",sn );
630     #endif
631                         if( logfile != NULL )
632                         {
633                             showLogMsg( RECEIVE_AND_LOSE,&pckt_info );
634                         }
635 
636                         if( pckt_info.type == DATA_PACKET_TYPE )
637                             first = 0;
638 
639                         if( first_pckts_lost>0 )
640                             first_pckts_lost--;
641 
642                         continue;
643                     }
644                 }
645             }
646 /* ------------------------ Loss Simulation end----------------------------------------- */
647             lock_eventlist_and_cache( );
648 #ifdef DMUTEX
649             fprintf( stderr, "DMUTEX rmcastReceivePackets: event list and cache LOCKED\n" );
650             showLogMsg( 'K', NULL );
651 #endif
652             if( logfile != NULL )
653             {
654                 showLogMsg( RECEIVE,&pckt_info );
655             }
656 
657             switch( pckt_info.type )
658             {
659                 case NAK_PACKET_TYPE:
660 #ifdef SINGLE_NACK
661                     {
662                         int i,aux_window_mask[MAX_WINDOW_SIZE];
663 
664                         memset( aux_window_mask, 0, ( MAX_WINDOW_SIZE*( sizeof( int ) ) ) );
665                         window_mask_numbers2array( aux_window_mask, pckt_info.packet_data.nak_packet.hmask, pckt_info.packet_data.nak_packet.lmask );
666     #ifdef DEBUG_NET_MSGS
667                         fprintf( stderr,  "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received!\n" );
668     #endif
669                         if( !memberIsEqual( &( pckt_info.packet_data.nak_packet.requested_member_id ),&( local_user_info.member_id ) ) )
670                         {
671                             for( i = 0; i < MAX_WINDOW_SIZE; i++ )
672                             {
673                                 if( aux_window_mask[i] == 1 )
674                                 {
675                                     /* Setting the real value to the sn */
676 
677                                     pckt_info.packet_data.nak_packet.sn = pckt_info.packet_data.nak_packet.base_sn + i + 1;
678                                     #ifdef DNAK_RCV
679                                         fprintf( stderr,  "DNAK_RCV we will try to remove event NAK_SND_WAIT for sn=%d\n",pckt_info.packet_data.nak_packet.sn );
680                                     #endif
681 
682                                     if( rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) )
683                                     {
684                     #ifdef DEBUG_NET_MSGS
685                                         fprintf( stderr,  "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_RCV_WAIT or a SUPPRESSED_NAK.\n" );
686                     #endif
687 
688                     #ifdef DNAK_RCV
689                                         fprintf( stderr,  "Inserting a SUPPRESSED_NAK\n" );
690                     #endif
691                                         rmcastInsertEvent( &pckt_info, SUPPRESSED_NAK );
692                                     }
693                                 }
694                             }
695                         }
696                         for( i = 0; i < MAX_WINDOW_SIZE; i++ )
697                         {
698                             if( aux_window_mask[i] == 1 )
699                             {
700                                  /* Setting the real value to the sn */
701                                 pckt_info.packet_data.nak_packet.sn = pckt_info.packet_data.nak_packet.base_sn + i + 1;
702 
703                                 #ifdef DNAK_RCV
704                                     fprintf( stderr, "DNAK_RCV rmcastReceivePackets: Before calling rmcastCacheContains sn=%d\n",pckt_info.packet_data.nak_packet.sn );
705                                 #endif
706 
707                                 if( rmcastCacheContains( &pckt_info ) &&( rmcastFindEvent( &pckt_info,RET_SND_WAIT ) == 0 ) )
708                                 {
709             #ifdef DEBUG_NET_MSGS
710                                     fprintf( stderr,  "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_SND_WAIT.\n" );
711             #endif
712 
713             #ifdef DNAK_RCV
714                                     fprintf( stderr,  "Inserting a RET_SND_WAIT: %s:%d:%d\n",
715                                                      pckt_info.packet_data.nak_packet.requested_member_id.ip,
716                                                      pckt_info.packet_data.nak_packet.requested_member_id.pid,
717                                                      pckt_info.packet_data.nak_packet.sn );
718             #endif
719                                     rmcastInsertEvent( &pckt_info,RET_SND_WAIT );
720                                 }
721                             }
722                         }
723                     }
724 #else /* using multiple nacks */
725     #ifdef DEBUG_NET_MSGS
726                     fprintf( stderr,  "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received!\n" );
727     #endif
728 
729                     if( !memberIsEqual( &( pckt_info.packet_data.nak_packet.requested_member_id ),&( local_user_info.member_id ) )
730                          && rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) )
731                     {
732     #ifdef DEBUG_NET_MSGS
733                         fprintf( stderr,  "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_RCV_WAIT.\n" );
734     #endif
735                         rmcastInsertEvent( &pckt_info,RET_RCV_WAIT );
736                     }
737                     else if( !rmcastFindEvent( &pckt_info,RET_SND_WAIT ) )
738                     {
739                         if( rmcastCacheContains( &pckt_info ) )
740                         {
741     #ifdef DEBUG_NET_MSGS
742                             fprintf( stderr,  "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_SND_WAIT.\n" );
743     #endif
744                             rmcastInsertEvent( &pckt_info,RET_SND_WAIT );
745                         }
746                     }
747 #endif /* SINGLE_NACK */
748                     break;
749 
750                 case LEAVE_GROUP_PACKET_TYPE:
751                      /*
752                       * FIXME
753                       * By now it is OK if the member does not exist in cache,
754                       * because he is leaving the group anyway :- ) So we don't
755                       * care about the return value for cacheDeactivateMember
756                       */
757                     cacheDeactivateMember( cache,&( pckt_info.sender_id ) );
758                     break;
759 
760                 case JOIN_REQUEST_PACKET_TYPE:
761                     {
762                         char *buffer;
763                         int buffsize;
764 
765                         PACKET_INFO pckt_info;
766 
767                         if( rmcast_options.cur_state_server_is_up )
768                         {
769                             pckt_info.type = JOIN_ACCEPT_PACKET_TYPE;
770                             pckt_info.flags = 0;
771                             pckt_info.sender_id = local_user_info.member_id;
772                             pckt_info.packet_size = sizeof( int );
773                             pckt_info.packet_data.join_accept_packet.port = rmcast_options.tcp_port;
774         #ifdef DEBUG_NET_MSGS
775                             fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: mounting the 'join accept packet'( port: %d )...", rmcast_options.tcp_port );
776         #endif
777                             msgPcktMountMessage( &pckt_info, ( BYTE** )&buffer, &buffsize );
778         #ifdef DEBUG_SHOW
779                             fprintf( stderr, "DEBUG_SHOW rmcastReceivePackets: ok!\n" );
780                             msgPcktShowMessage(( BYTE* )buffer );
781         #endif
782         #ifdef DEBUG_NET_MSGS
783                             fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: now, sending the message to the mcast group\n" );
784         #endif
785                             rmcastSendPacket( local_user_info.socket, ( BYTE* )buffer, buffsize );
786                         }
787                     }
788                     break;
789 
790                 case JOIN_ACCEPT_PACKET_TYPE:
791                      /*
792                       * Ignore this packet. He is handled only when we are entering the group.
793                       * See RM_getCurState at rmcast.c
794                       */
795                      break;
796 
797                 case REFRESH_PACKET_TYPE:
798 #ifdef REFRESH
799                      rmcastProcessRefreshPacket( pckt_info );
800 #endif
801                      break;
802 
803                 case RETRANSM_PACKET_TYPE:
804 
805 #ifdef DSUPPRESSION
806                      if( rmcastRemoveEvent( &pckt_info,RET_SND_WAIT ) ){
807                          fprintf( suppressionfile,"RT %s %d %s %d %d\n",
808                                  pckt_info.sender_id.ip,
809                                  pckt_info.sender_id.pid,
810                                  pckt_info.packet_data.retransm_packet.original_sender_id.ip,
811                                  pckt_info.packet_data.retransm_packet.original_sender_id.pid,
812                                  pckt_info.packet_data.retransm_packet.data_packet.sn );
813                      }
814 #else
815                      rmcastRemoveEvent( &pckt_info,RET_SND_WAIT );
816 #endif
817                     if( !rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) &&
818                         !rmcastRemoveEvent( &pckt_info,RET_RCV_WAIT ) )
819                     {
820                         if( rmcastCacheContains( &pckt_info ) )
821                             break;
822                     }
823 
824                     /* Now we will get the content of the retransmition and put it in the cache. */
825                     strcpy( aux_member_id.ip, pckt_info.packet_data.retransm_packet.original_sender_id.ip );
826                     aux_member_id.pid=pckt_info.packet_data.retransm_packet.original_sender_id.pid;
827                     pckt_info.packet_data.data_packet = pckt_info.packet_data.retransm_packet.data_packet;
828                     pckt_info.sender_id = aux_member_id;
829 
830                     /* continue... */
831                 case DATA_PACKET_TYPE:
832                     rmcastProcessDataPacket( pckt_info ); /* inserts data in cache... */
833 
834 #ifdef DSINGLE_NACK
835                     fprintf( stderr, "DSINGLE_NACK rmcastReceivePackets: after executing rmcastProcessDataPacket.\n" );
836                     eventListShow( event_list );
837 #endif
838                     break;
839 
840                 default:
841                     fprintf( stderr, "rmcastReceivePackets: Invalid message type.( %d )\n",pckt_info.type );
842                     RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
843 
844             } /* End switch */
845             unlock_eventlist_and_cache( );
846 #ifdef DMUTEX
847             showLogMsg( 'U', NULL );
848             fprintf( stderr, "DMUTEX rmcastReceivePackets: event list and cache UNLOCKED\n" );
849 #endif
850         } /* nbytes > 0 */
851 
852 #ifdef DEBUG_NET_MSGS
853     fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: received, ok!\n" );
854 #endif
855 #ifdef DPROCESSING_TIME
856     gettimeofday( &time_after, 0 );
857     time = *gmtime(( const time_t* )&time_after );
858     time_after.tv_sec = (( time.tm_hour * 60 + time.tm_min ) * 60 ) + time.tm_sec;
859 
860     time_after.tv_sec = time_after.tv_sec - time_before.tv_sec;
861     time_after.tv_usec = time_after.tv_usec - time_before.tv_usec;
862 
863     fprintf( proc_log,"%d %.0Lf\n",tipo, (( long double )time_after.tv_sec*1000000+( long double )time_after.tv_usec ) );
864 #endif
865 
866     } FOREVER;
867 
868     return NULL;
869 }
870 
871 /*****************************************************************************************************
872  *
873  * void  rmcastSendPacket( int soc, BYTE *message, int message_size )
874  *
875  * Sends a message packet of size message_size to socket soc.
876  *
877  * Returns: 1, on success;
878  *          0, on error.
879  *
880  ******************************************************************************************************/
rmcastSendPacket(int soc,BYTE * message,int message_size)881 int rmcastSendPacket( int soc, BYTE *message, int message_size )
882 {
883     PACKET_INFO pckt_info;
884     struct sockaddr_in dest;
885     int sn;
886 
887 #ifdef DEBUG_SHOW2
888     int i;
889 #endif
890 
891     int retval;
892     char type = *(( char* )message );
893     char local_message[ message_size + 1 ];
894     struct timespec sleep_time;
895 
896 #ifdef DEBUG_SHOW2
897     fprintf( stderr, "DEBUG_SHOW2 - rmcastSendPacket: will copy a message of size %d to local var.\n",message_size );
898     for( i=0; i< message_size; i++ )
899     {
900         fprintf( stderr, "( %d )%d",i, ( unsigned int )message[i] );
901     }
902     fprintf( stderr, "\n" );
903 #endif
904 
905     memcpy( local_message, message, message_size );
906     dest.sin_family      = AF_INET;
907     dest.sin_port        = htons   ( local_user_info.group_id.port );
908     dest.sin_addr.s_addr = inet_addr( local_user_info.group_id.ip );
909 
910     if( type == DATA_PACKET_TYPE )
911     {
912           /* REFRESH event is now inserted only after user sends the first packet, i.e., local_user_info.sn==0 */
913         if( local_user_info.sn == 0 )
914         {
915             eventListInsert( &event_list, &( local_user_info.member_id ), NULL, REF_SND_WAIT, -1 );
916         }
917         pthread_mutex_lock( &change_local_user_sn );
918         local_user_info.sn++;
919         pthread_mutex_unlock( &change_local_user_sn );
920     }
921 
922     msgPcktUnmountMessage( &pckt_info, ( BYTE * )local_message );
923 
924     switch( pckt_info.type )
925     {
926         case DATA_PACKET_TYPE:
927             sn = pckt_info.packet_data.data_packet.sn; break;
928         case NAK_PACKET_TYPE:
929             sn = pckt_info.packet_data.nak_packet.sn; break;
930         case RETRANSM_PACKET_TYPE:
931             sn = pckt_info.packet_data.retransm_packet.data_packet.sn; break;
932         case REFRESH_PACKET_TYPE:
933             sn = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent; break;
934         case JOIN_REQUEST_PACKET_TYPE:
935             sn = -1; break;
936         case JOIN_ACCEPT_PACKET_TYPE:
937             sn = -1; break;
938         default:
939             sn = -1;
940     }
941 #ifdef DEBUG_NET_MSGS
942     fprintf( stderr, "DEBUG_NET_MSGS rmcastSendPacket: will send a %s - sn: %d\n", MESSAGE_TYPE_NAME[( int )type],sn );
943 #endif
944 
945     if( logfile != NULL )
946     {
947         showLogMsg( SEND,&pckt_info );
948     }
949 
950     retval = sendto( soc, local_message, message_size, 0, ( struct sockaddr * )&dest, sizeof( dest ) );
951 
952     if( retval < 0 )
953     {
954         fprintf( stderr, "ERROR - rmcastSendPacket: retval < 0\n" );
955         perror( "send_message" );
956 
957         msgPcktShowMessage( message );
958 
959         return 0;
960     }
961     /* fprintf( stderr, " Packet size RML: %d\n",message_size ); FIXME teste pra saber o tamanho dos pacotes */
962 
963     if( ( rmcast_options.microsleep > 0 ) &&( rmcast_options.microsleep < 65536 ) )
964     {
965         /* Converting rmcast_options.microsleep to nanoseconds as needed by nanosleep function */
966         sleep_time.tv_nsec= ( rmcast_options.microsleep*1000 );
967         sleep_time.tv_sec=0;
968         nanosleep( &sleep_time,NULL );
969         /* usleep( rmcast_options.microsleep ); */
970     }
971     return 1;
972 }
973 
974 /*****************************************************************************************************
975  *
976  *  void *rmcastHandleSignals( void *arg )
977  *
978  *  This thread will be the only one that threats the alarm events.
979  *
980  *  Communication with it should be done using SIGUSR1.
981  *
982  *****************************************************************************************************/
983 extern const char * const sys_siglist[];
984 #define _GNU_SOURCE
985 #include <string.h>
986 char *strsignal( int sig );
987 extern const char * const sys_siglist[];
988 
rmcastHandleSignals(void * arg)989 void *rmcastHandleSignals( void *arg )
990 {
991     int sig;
992     int err;
993     int finish = 0;
994 
995 #ifndef SOLARIS
996     struct itimerval value;
997 #else
998      sigset_t     alrmset;
999      sigfillset( &alrmset );
1000      pthread_sigmask( SIG_BLOCK, &alrmset, NULL );
1001 #endif
1002 
1003     while( 1 )
1004     {
1005 #ifdef DEBUG_EVENTS
1006         fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: Before sigwait( )\n" );
1007 #endif
1008         err = sigwait( &alrmset, &sig );
1009 #ifdef DEBUG_EVENTS
1010         fprintf( stderr,  "DEBUG_EVENTS rmcastHandleSignals: After sigwait( ) \n" );
1011 #endif
1012 #ifdef DEBUG_EVENTS
1013 #ifdef SOLARIS
1014         fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals( SOLARIS ): signal   ( %d ) received\n", sig );
1015 #else
1016         fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: signal %s( %d ) received\n",sys_siglist[sig], sig );
1017 #endif
1018 #endif
1019         if( err || ( sig != SIGALRM && sig != SIGUSR1 ) )
1020         {
1021             fprintf( stderr, "rmcastHandleSignals: [%s signal catched]\n",strsignal( sig ) );
1022             if( sig == SIGINT || sig == SIGTERM )
1023             {
1024                 fprintf( stderr, "rmcastHandleSignals: Signal %s received. Exiting.\n",strsignal( sig ) );
1025                 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1026             }
1027             else if( sig == SIGTSTP )
1028             {
1029                 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: SIGSTP received. Nothing to be done.\n" );
1030                 /* FIXME( in a future release, we may be able to handle the SIGTSTP signal ) */
1031             }
1032             else if( sig == SIGUSR2 )
1033             {
1034                 STOP_TRANSMISSION = 1;
1035             }
1036             else continue;
1037         }
1038         lock_eventlist_and_cache( );
1039 
1040 #ifdef DMUTEX
1041         showLogMsg( 'k', NULL );
1042         fprintf( stderr, "DMUTEX rmcastHandleSignals: event list and cache LOCKED\n" );
1043 #endif
1044         if( sig == SIGALRM ||( event_list != NULL && event_list->timer_value == 0 ) )
1045         {
1046 #ifdef DSINGLE_NACK
1047             fprintf( stderr, "DSINGLE_NACK rmcastHandleSignals: calling rmcastCatchEvents\n" );
1048 #endif
1049             finish = rmcastCatchEvents( 0 );
1050 
1051 #ifdef DSINGLE_NACK
1052             fprintf( stderr, "DSINGLE_NACK rmcastHandleSignals: back from rmcastCatchEvents\n" );
1053 #endif
1054             if( finish )
1055                 break;
1056         }
1057 #ifdef DSINGLE_NACK
1058         fprintf( stderr, "DSINGLE_NACK rmcastHandleSignals: showing eventlist:\n" );
1059         eventListShow( event_list );
1060 #endif
1061 
1062         if( event_list != NULL )
1063         {
1064 #ifdef DEBUG_EVENTS
1065            fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: setting timer to %ld( rmcastHandleSignals )\n",event_list->timer_value );
1066 #endif
1067 
1068 #ifndef SOLARIS
1069            setTimerValue( &value,event_list->timer_value );
1070            setitimer( ITIMER_REAL, &value,NULL );
1071 #else
1072            alarm( event_list->timer_value/1000000 + 1 );
1073 #endif
1074         }
1075         else
1076         {
1077 #ifdef DEBUG_EVENTS
1078            fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: resetting timer( rmcastHandleSignals )\n" );
1079 #endif
1080 
1081 #ifndef SOLARIS
1082            setTimerValue( &value,0 );
1083            setitimer( ITIMER_REAL,&value,NULL );
1084 #else
1085            alarm( 0 );
1086 #endif
1087         }
1088         unlock_eventlist_and_cache( );
1089 #ifdef DMUTEX
1090         showLogMsg( 'u', NULL );
1091         fprintf( stderr, "DMUTEX rmcastHandleSignals: cache and event list UNLOCKED\n" );
1092 #endif
1093     }
1094     return  0;
1095 }
1096 
1097 /*****************************************************************************************************
1098  *
1099  * int  rmcastCatchEvents  ( int i )
1100  *
1101  * Catch and execute event list events.
1102  *
1103  ******************************************************************************************************/
rmcastCatchEvents(int i)1104 int rmcastCatchEvents( int i )
1105 {
1106     int retval = 0;
1107     int function_retval = 0;
1108     struct sockaddr_in group;
1109     struct ip_mreq mreq;
1110     CACHE *cache_sender_entry;
1111 #ifndef SINGLE_NACK
1112     int update_nak_list_retval;
1113 #endif
1114     EVENT_LIST first;
1115     int msgsize;
1116     BYTE *message;
1117     PACKET_INFO pack;
1118     DATA_PACKET *data;
1119 
1120 #ifdef DEBUG_EVENTS
1121     fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Event catched!\n" );
1122 #endif
1123     if( event_list == NULL )
1124     {
1125         #ifdef DEBUG_EVENTS
1126         fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Internal warning! first == NULL\n" );
1127         #endif
1128         return retval;
1129     }
1130     first = *event_list;
1131 
1132 #ifdef DSINGLE_NACK
1133     fprintf( stderr, "DSINGLE_NACK rmcastCatchEvents: showing event list:\n" );
1134 #endif
1135     eventListShow( event_list );
1136 
1137 #ifdef DEBUG_EVENTS
1138     fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: event list: %p\n",&( *event_list ) );
1139 #endif
1140 
1141     do
1142     {
1143 #ifdef DEBUG_EVENTS
1144     fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Removing event: action %s timer value: %ld\n",
1145         EVENT_ACTION_NAME[( int )first.action],
1146         first.timer_value );
1147 #endif
1148 #ifdef DNAK_RCV
1149     fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Removing event: action %s timer value: %ld\n",
1150         EVENT_ACTION_NAME[( int )first.action],
1151         first.timer_value );
1152 #endif
1153         if( eventListRemoveFirst( &event_list ) == -1 )
1154         {
1155 #ifdef DEBUG_EVENTS
1156             fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: warning eventListRemoveFirst( &event_list ) == -1\n" );
1157 #endif
1158         }
1159         eventListShow( event_list );
1160 
1161     switch( first.action )
1162     {
1163         case NAK_SND_WAIT:
1164              pack.type=NAK_PACKET_TYPE;
1165              pack.flags=0;
1166              pack.sender_id = local_user_info.member_id;
1167              pack.packet_size=sizeof( NAK_PACKET );
1168              strcpy( pack.packet_data.nak_packet.requested_member_id.ip,first.member_id->ip );
1169              pack.packet_data.nak_packet.requested_member_id.pid=first.member_id->pid;
1170              pack.packet_data.nak_packet.sn=first.sn;
1171 #ifdef SINGLE_NACK
1172             {
1173                  CACHE *aux_cache_member = NULL;
1174                  aux_cache_member = cacheLookForMember( &cache, first.member_id );
1175 
1176                  if( aux_cache_member!=NULL )
1177                  {
1178                      pack.packet_data.nak_packet.base_sn=aux_cache_member->sm_info.member_status.last_seq_rcv;
1179                      pack.packet_data.nak_packet.window_size=aux_cache_member->sm_info.member_status.window_size;
1180 
1181                      window_mask_array2numbers( aux_cache_member->sm_info.member_status.window_mask,
1182                      &pack.packet_data.nak_packet.hmask, &pack.packet_data.nak_packet.lmask, aux_cache_member->sm_info.member_status.window_ini );
1183                  }
1184              }
1185 #endif
1186              msgPcktMountMessage( &pack, &message, &msgsize );
1187 #ifdef DEBUG_EVENTS
1188              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: will send NAK ...\n" );
1189 #endif
1190              rmcastSendPacket( local_user_info.socket, message, msgsize );
1191 
1192 #ifdef DEBUG_EVENTS
1193              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: NAK sent OK.\n" );
1194 #endif
1195              cache_sender_entry = cacheLookForMember( &cache,first.member_id );
1196 
1197              if( cache_sender_entry != NULL )
1198                  retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ), cache_sender_entry->sm_info.member_id.ip, RET_RCV_WAIT, first.sn );
1199 
1200 #ifdef DEBUG_EVENTS
1201              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: RET_RCV_WAIT was inserted in the event list.\n" );
1202 #endif
1203              break;
1204 
1205         case RET_RCV_WAIT:
1206 
1207              /* Next we will check if it is possible to send one more NAK for the lost message,
1208              i.e., the NAK should be sent only rmcast_options.max_nak times */
1209 #ifdef DEBUG_EVENTS
1210              fprintf( stderr, " \nDEBUG_EVENTS - rmcastCatchEvents: Waiting for retransmission timed out\n ip: %s - pid: %d - sn: %d\n",
1211              first.member_id->ip, first.member_id->pid, first.sn );
1212 #endif
1213 #ifndef SINGLE_NACK
1214              /* When we are using SINGLE_NACK this check is done in the event list insert funciton */
1215              if(( update_nak_list_retval = cacheUpdateNakList( &cache, first.member_id, first.sn ) ) > 0 )
1216              {
1217 #endif
1218                  cache_sender_entry = cacheLookForMember( &cache,first.member_id );
1219 
1220                  if( cache_sender_entry != NULL )
1221 #ifdef SINGLE_NACK
1222                      retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ), cache_sender_entry->sm_info.member_id.ip, RET_RCV_EXPIRED, first.sn );
1223 #else
1224                      retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ), cache_sender_entry->sm_info.member_id.ip, NAK_SND_WAIT, first.sn );
1225 #endif /* SINGLE_NACK */
1226 
1227 #ifdef DEBUG_EVENTS
1228                  fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: NAK_SND_WAIT was inserted in the event list.\n" );
1229 #endif
1230 
1231 #ifndef SINGLE_NACK
1232              }else if( update_nak_list_retval == 0 )
1233                  {
1234                      /*
1235                       * We have sent the max number of NAK( see rmcast_options structure ) we didn't recover the packet
1236                       * so we must exit because we cannot keep the reliability
1237                       */
1238                      fprintf( stderr, "********\n" );
1239                      fprintf( stderr, "rmcastCatchEvents ERROR: Recovering packet failed. Max number( %d ) of NAKs reached!\n",rmcast_options.max_nak );
1240                      fprintf( stderr, "\tHost IP:PID=%s:%d\tsn=%d\n",first.member_id->ip,first.member_id->pid,first.sn );
1241                      fprintf( stderr, "********\n" );
1242 
1243                      RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1244              }
1245 #endif
1246         break;
1247 
1248         case RET_SND_WAIT:
1249 
1250              if(( data=cacheLookForMessage( &cache, first.member_id, first.sn ) )!=NULL )
1251              {
1252                  pack.type = RETRANSM_PACKET_TYPE;
1253                  pack.flags = 0;
1254                  pack.sender_id = local_user_info.member_id;
1255                  pack.packet_size = sizeof( RETRANSM_PACKET );
1256                  pack.packet_data.retransm_packet.original_sender_id = *first.member_id;
1257                  pack.packet_data.retransm_packet.data_packet = *data;
1258                  msgPcktMountMessage( &pack, &message, &msgsize );
1259 
1260          #ifdef DEBUG_EVENTS
1261                  fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: will send a RET... " );
1262          #endif
1263                  rmcastSendPacket( local_user_info.socket, message, msgsize );
1264          #ifdef DEBUG_EVENTS
1265                  fprintf( stderr, "ok( rmcastCatchEvents )\n" );
1266          #endif
1267               }
1268 #ifdef DEBUG_EVENTS
1269                  fprintf( stderr, " \nDEBUG_EVENTS - rmcastCatchEvents: Message retransmited\n ip: %s - pid: %d - sn: %d\n",
1270                          first.member_id->ip, first.member_id->pid, first.sn );
1271 #endif
1272         break;
1273 
1274 #ifdef REFRESH
1275         case REF_SND_WAIT:
1276 #ifdef DEBUG_EVENTS
1277              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Processing the refresh event.\n" );
1278 #endif
1279              pack.type = REFRESH_PACKET_TYPE;
1280              pack.flags = 0;
1281              pack.sender_id = local_user_info.member_id;
1282              pack.packet_size = sizeof( REFRESH_PACKET );
1283 
1284              pthread_mutex_lock( &change_local_user_sn );
1285              pack.packet_data.refresh_packet.sn_of_last_msg_sent = local_user_info.sn - 1;
1286              pthread_mutex_unlock( &change_local_user_sn );
1287 
1288 #ifdef DEBUG_EVENTS
1289              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Mounting refresh packet.\n" );
1290 #endif
1291              msgPcktMountMessage( &pack, &message, &msgsize );
1292              if( rmcastSendPacket( local_user_info.socket, message, msgsize ) == 1 )
1293                  eventListInsert( &event_list, NULL, NULL, REF_SND_WAIT, -1 );
1294                  /* We are not supposed to set a valid sn in the sn field
1295                     when we are inserting a REF_SND_WAIT in the event list,
1296                     so we use a -1 value to the sn in this eventListInsert
1297                     function call*/
1298 
1299 #ifdef DEBUG_EVENTS
1300              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: refresh packet mounted.\n" );
1301 #endif
1302 #ifdef DEBUG_EVENTS
1303              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Refresh event processed.\n" );
1304 #endif
1305              break;
1306 
1307 #endif
1308 
1309         case LEV_GRP_WAIT:
1310 
1311 #ifdef DEBUG_EVENTS
1312              fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Processing 'leave group'  event.\n" );
1313 #endif
1314              if(( group.sin_addr.s_addr = inet_addr( local_user_info.group_id.ip ) ) == ( unsigned )-1 )
1315                  perror( "rmcastLeaveGroup: inet_addr" );
1316 
1317              mreq.imr_multiaddr      = group.sin_addr;
1318              mreq.imr_interface.s_addr = INADDR_ANY;
1319 
1320              if( rmcast_options.transmission_mode == MULTICAST )
1321              {
1322                  if( setsockopt( local_user_info.socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, ( char * )&mreq, sizeof( mreq ) ) == -1 )
1323                      perror( "rmcastLeaveGroup: DROP_MEMBERSHIP" );
1324              }
1325 
1326 #ifdef DEBUG_NET_MSGS
1327              fprintf( stderr,  "DEBUG_NET_MSGS rmcastLeaveGroup: disconnected\n" );
1328 #endif
1329              remove_queues( 0 );
1330 
1331              if( rmcast_options.shut_down_routine!=NULL )
1332                  rmcast_options.shut_down_routine( );
1333 
1334              function_retval = 1;
1335              RM_terminate();
1336 
1337              break;
1338 
1339         default:
1340              fprintf( stderr, "rmcastCatchEvents Error: Action Undefined %d\n",first.action );
1341     }
1342 
1343         if( event_list == NULL )
1344             break;
1345 
1346         first = *event_list;
1347 
1348     } while( first.timer_value == 0 && function_retval != 1 );
1349 
1350     eventListShow( event_list );
1351 
1352     return function_retval;
1353 }
1354 /*********************** End of the Main routines *************************************************/
1355 
1356 /**************** Routines to facilitate the interface with the event list and the cache **********/
1357 
1358 /*****************************************************************************************************
1359  *
1360  * void rmcastProcessDataPacket( PACKET_INFO pckt_info )
1361  *
1362  * Processes the data packet.
1363  *
1364  * Arguments: the packet to be processed, in pckt_info;
1365  *
1366  * Side effects: may affect the event list, the cache and the message queue.
1367  *
1368  ******************************************************************************************************/
rmcastProcessDataPacket(PACKET_INFO pckt_info)1369 void rmcastProcessDataPacket( PACKET_INFO pckt_info )
1370 {
1371     int i, retval;
1372 
1373     MEMBER_ID *aux_member_id = &( pckt_info.sender_id );
1374     CACHE_NODE *aux_pckt_info;
1375     CACHE *p_old_cache_sender_entry = NULL, *p_new_cache_sender_entry = NULL;
1376 #ifdef SINGLE_NACK
1377     CACHE *cache_aux1_entry = NULL;
1378 #endif
1379     CACHE *aux_cache_member;
1380     CACHE old_cache_sender_entry, new_cache_sender_entry;
1381 
1382     bzero(( &old_cache_sender_entry ),sizeof( CACHE ) );
1383 
1384    ( old_cache_sender_entry ).sm_info.member_status.last_identified = -1;
1385    ( old_cache_sender_entry ).sm_info.member_status.first_rcv       = -1;
1386    ( old_cache_sender_entry ).sm_info.member_status.last_rcv        = -1;
1387    ( old_cache_sender_entry ).sm_info.member_status.last_seq_rcv    = -1;
1388 
1389 #ifdef SINGLE_NACK
1390    ( old_cache_sender_entry ).sm_info.member_status.window_size = MAX_WINDOW_SIZE;
1391    ( old_cache_sender_entry ).sm_info.member_status.window_ini = 0;
1392     memset(( old_cache_sender_entry ).sm_info.member_status.window_mask, 0, MAX_WINDOW_SIZE );
1393 #endif
1394 
1395     p_old_cache_sender_entry = cacheLookForMember( &cache, aux_member_id );
1396 
1397     if( ( p_old_cache_sender_entry != NULL ) )
1398     {
1399         old_cache_sender_entry = *p_old_cache_sender_entry;
1400 
1401         if( pckt_info.type == DATA_PACKET_TYPE )
1402         {
1403             /* Delete all events about this packet */
1404 
1405             if( !rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) )
1406             {
1407                 #ifdef DSINGLE_NACK
1408                 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: no NAK_SND_WAIT event found.\n" );
1409                 #endif
1410                 if( !rmcastRemoveEvent( &pckt_info,RET_RCV_WAIT ) )
1411                 {
1412                     #ifdef DSINGLE_NACK
1413                     fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: no RET_RCV_WAIT found.\n" );
1414                     #endif
1415 
1416                     rmcastRemoveEvent( &pckt_info,RET_SND_WAIT );
1417 
1418                     #ifdef DSINGLE_NACK
1419                     fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: After rmcastRemoveEvent( &pckt_info,RET_SND_WAIT ).\n" );
1420                     #endif
1421                 }
1422             }
1423         }
1424     }
1425 
1426     pckt_info.type = DATA_PACKET_TYPE;
1427 #ifdef DEBUG_CACHE
1428     fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: Inserting data in cache: ip: %s pid: %d\n",aux_member_id->ip,aux_member_id->pid );
1429 #endif
1430     aux_cache_member=cacheLookForMember( &cache, &( pckt_info.sender_id ) );
1431 
1432 #ifdef SINGLE_NACK /*block begin*/
1433 {
1434     int aux_sn;
1435     int aux_cont;
1436     int aux_last_seq_rcv;
1437     EVENT_LIST *aux_list_node;
1438 
1439     aux_sn   = -1;
1440     aux_cont = 0;
1441     aux_list_node    = NULL;
1442     aux_last_seq_rcv = -1;
1443 
1444     if( aux_cache_member != NULL )
1445     {
1446         aux_last_seq_rcv = aux_cache_member->sm_info.member_status.last_seq_rcv;
1447     }
1448 #endif
1449      if(( aux_cache_member == NULL ) ||( pckt_info.packet_data.data_packet.sn > aux_cache_member->sm_info.member_status.last_seq_rcv ) )
1450         retval = cacheInsertMessage( &cache, &( pckt_info.sender_id ), &( pckt_info.packet_data.data_packet ) );
1451      else
1452         return; /* Old packet. We don't need to handle it again */
1453 
1454     p_new_cache_sender_entry = cacheLookForMember( &cache, aux_member_id );
1455     /* We don't need to test if p_new_cache_sender_entry is NULL
1456        because we have just inserted it into the cache */
1457 #ifdef SINGLE_NACK
1458      /* When we are sliding the window we must check if we have to send a nack to the next sn available in the window_mask */
1459 
1460     if( p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv > aux_last_seq_rcv )
1461     {
1462 
1463         aux_cont = ( p_new_cache_sender_entry->sm_info.member_status.last_identified -
1464                     p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv );
1465 
1466         if( aux_cont > MAX_WINDOW_SIZE )
1467         {
1468             aux_cont = MAX_WINDOW_SIZE;
1469         }
1470 
1471         for( i = p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv + 1;
1472               i <= ( aux_cont + p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv );
1473               i++ )
1474         {
1475             aux_sn = i;
1476 
1477             if( cacheLookForMessage( &cache, &( pckt_info.sender_id ), aux_sn ) == NULL )
1478             {
1479 #ifdef DSINGLE_NACK
1480                 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: trying to insert NAK_SND_WAIT event, sn=%d\n",aux_sn );
1481 #endif
1482                 if( eventListFind( &event_list,&( pckt_info.sender_id ),NAK_SND_WAIT,aux_sn,&aux_list_node ) != NULL )
1483                 {
1484 #ifdef DSINGLE_NACK
1485                     fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: NAK_SND_WAIT exists,just set sn=%d bit\n",aux_sn );
1486 #endif
1487                     window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,aux_sn,&( aux_cache_member->sm_info.member_status ),NAK_SND_WAIT );
1488                 }
1489                 else
1490                 {
1491                     if( eventListFind( &event_list,&( pckt_info.sender_id ),RET_RCV_WAIT,aux_sn,&aux_list_node ) != NULL )
1492                     {
1493 #ifdef DSINGLE_NACK
1494                         fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: RET_RCV_WAIT event exists, just set sn=%d bit\n",aux_sn );
1495 #endif
1496                         window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,aux_sn,&( aux_cache_member->sm_info.member_status ),RET_RCV_WAIT );
1497                     }
1498                     else
1499                     {
1500 #ifdef DSINGLE_NACK
1501                         fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket:Inserting NAK_SND_WAIT event, sn=%d:\n",aux_sn );
1502 #endif
1503                         eventListInsert( &event_list,&( aux_cache_member->sm_info.member_id ),aux_cache_member->sm_info.member_id.ip, NAK_SND_WAIT,aux_sn );
1504                     }
1505                 }
1506             }
1507         }
1508     }
1509 }
1510 #endif    /*SINGLE_NACK block end*/
1511 
1512 
1513     new_cache_sender_entry = *p_new_cache_sender_entry;
1514     cacheShow( cache );
1515     eventListShow( event_list );
1516     aux_pckt_info = new_cache_sender_entry.last_inserted;
1517 
1518     if( retval == 1 ) /* the inserted packet is in sequence -> we will send data to the application */
1519     {
1520 #ifdef DEBUG_CACHE
1521         fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: The inserted packet is in sequence...\n" );
1522 #endif
1523         for( i  = old_cache_sender_entry.sm_info.member_status.last_seq_rcv + 1;
1524              i <= new_cache_sender_entry.sm_info.member_status.last_seq_rcv;
1525              i++ )
1526         {
1527     #ifdef DEBUG_CACHE
1528             fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: A \"reliable\" message will be enqueued:( sn: %d size: %d )\n",
1529                         i, pckt_info.packet_data.data_packet.data_size );
1530     #endif
1531             if( messageQueueEnter(( char* )( aux_pckt_info->data_pckt.data ),pckt_info.packet_data.data_packet.data_size ) != -1 )
1532             {
1533                 aux_pckt_info = aux_pckt_info->previous;
1534             }
1535             else
1536             {
1537                 fprintf( stderr, "messageQueueEnter ERROR!\n" );
1538                 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1539             }
1540         }
1541 #ifdef DSINGLE_NACK
1542         eventListShow( event_list );
1543 #endif
1544     }
1545     else if( retval == 2 ) /* the inserted packet is not in sequence -> we will send nak's through the network */
1546     {
1547 #ifdef DEBUG_CACHE
1548         fprintf( stderr, "DEBUG_CACHE rmcastprocessdatapacket: The inserted packet is NOT in sequence... \n" );
1549         fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: showing eventlist:\n" );
1550         eventListShow( event_list );
1551 #endif
1552         for( i = old_cache_sender_entry.sm_info.member_status.last_identified+1;
1553              i <  new_cache_sender_entry.sm_info.member_status.last_identified;
1554              i++ )
1555         {
1556             pckt_info.packet_data.data_packet.sn = i;
1557 
1558             if( cacheLookForMessage( &cache, aux_member_id, i )==NULL )
1559             {
1560 
1561 #ifdef DEBUG_EVENTS
1562                 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: trying to schedule a NAK_SND_WAIT event to( sn: %d )\n", i );
1563                 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: new_cache_sender_entry.sm_info.member_status.last_seq_rcv=%d\n",
1564                                 new_cache_sender_entry.sm_info.member_status.last_seq_rcv );
1565 #endif
1566 #ifdef SINGLE_NACK
1567                 if( i < new_cache_sender_entry.sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE )
1568                 {
1569 #endif
1570                     if( !rmcastFindEvent( &pckt_info,NAK_SND_WAIT ) )
1571                     {
1572                         if( !rmcastFindEvent( &pckt_info, RET_RCV_WAIT ) )
1573                         {
1574 #ifndef SINGLE_NACK
1575                             /* when we are using SINGLE_NACK this check is done inside eventListInsert */
1576                             if( cacheUpdateNakList( &cache, &( pckt_info.sender_id ), i )!=0 )
1577                             {
1578 #endif
1579                                 rmcastInsertEvent( &pckt_info,NAK_SND_WAIT );
1580                                 #ifdef DEBUG_EVENTS
1581                                 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: NAK_SND_WAIT was inserted in the event list.\n" );
1582                                 #endif
1583 #ifndef SINGLE_NACK
1584                             }
1585                             else
1586                             {
1587                                 /*
1588                                 * We have sent the max number of NAK( see rmcast_options structure ) we didn't recover the packet
1589                                 * so we must exit because we cannot keep the reliability
1590                                 */
1591                                 fprintf( stderr, "********\n" );
1592                                 fprintf( stderr, "rmcastProcessDataPacket ERROR: Recovering packet failed. Max number( %d ) of NAKs reached!\n",rmcast_options.max_nak );
1593                                 fprintf( stderr, "\tHost IP:PID=%s:%d\tsn=%d\n",aux_member_id->ip,aux_member_id->pid,i );
1594                                 fprintf( stderr, "********\n" );
1595 
1596                                 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1597                             }
1598             #endif
1599                         }
1600 #ifdef SINGLE_NACK
1601                         else
1602                         {
1603                              cache_aux1_entry=cacheLookForMember( &cache, aux_member_id );
1604                              #ifdef DSINGLE_NACK
1605                               fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: before window_mask_set_bit\n" );
1606                               fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: there is a RET_RCV_WAIT, just set sn=%d\n",i );
1607                              #endif
1608                              window_mask_set_bit( cache_aux1_entry->sm_info.member_status.window_mask,
1609                                                  i,&cache_aux1_entry->sm_info.member_status,RET_RCV_WAIT );
1610                         }
1611 #endif
1612                     }
1613 #ifdef SINGLE_NACK
1614                     else
1615                     {
1616                         int aux_retval=0;
1617 
1618                     #ifdef DSINGLE_NACK
1619                         fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: found a NAK_SND_WAIT event.\n" );
1620                     #endif
1621                         cache_aux1_entry=cacheLookForMember( &cache, aux_member_id );
1622 
1623                     #ifdef DWINDOW_MASK
1624                         fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: before window_mask_get_bit\n" );
1625                         fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: window_mask=%p\n",cache_aux1_entry->sm_info.member_status.window_mask );
1626                         fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: sn=%d\n",i );
1627                         fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: member_status=%p\n",&cache_aux1_entry->sm_info.member_status );
1628                     #endif
1629 
1630                         aux_retval=window_mask_get_bit( cache_aux1_entry->sm_info.member_status.window_mask,i,&cache_aux1_entry->sm_info.member_status );
1631 
1632                         if( aux_retval != RET_RCV_WAIT ) /* FIXME - este teste pode nao ser necessario pois nao queremos eventos NAK e RET_RCV concorrentes */
1633                         {
1634                             if( cacheUpdateNakList( &cache, &( pckt_info.sender_id ), i )!=0 )
1635                             {
1636                                 #ifdef DEBUG_EVENTS
1637                                 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: NAK_SND_WAIT event exists, just set sn=%d to NAK_SND_WAIT\n",i );
1638                                 #endif
1639                                 window_mask_set_bit( cache_aux1_entry->sm_info.member_status.window_mask,i,&cache_aux1_entry->sm_info.member_status,NAK_SND_WAIT );
1640                             }
1641                             else
1642                             {
1643                                 /*
1644                                 * We have sent the max number of NAK( see rmcast_options structure ) we didn't recover the packet
1645                                 * so we must exit because we cannot keep the reliability
1646                                 */
1647 
1648                                 fprintf( stderr, "********\n" );
1649                                 fprintf( stderr, "rmcastProcessDataPacket ERROR: Recovering packet failed. Max number( %d ) of NAKs reached!\n",rmcast_options.max_nak );
1650                                 fprintf( stderr, "\tHost IP:PID=%s:%d\tsn=%d\n",aux_member_id->ip,aux_member_id->pid,i );
1651                                 fprintf( stderr, "********\n" );
1652 
1653                                 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1654                             }
1655                         }
1656                     }
1657 
1658                 } /* i is in the window_mask range*/
1659 #endif /*SINGLE_NACK */
1660             } /* message exists we don't need to send a NACK */
1661         } /* for( i = old_cache_sender_entry.sm_info.member_status.last_identified+1; ... */
1662     } /*else if( retval==2 ) */
1663     else
1664     {
1665         #ifdef DEBUG_CACHE
1666          fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: Couldn�t insert cache node.\n" );
1667         #endif
1668     }
1669 
1670 }
1671 
1672 /*****************************************************************************************************
1673  *
1674  * void rmcastProcessRefreshPacket( PACKET_INFO pckt_info )
1675  *
1676  * Processes the refresh packet.
1677  *
1678  * Arguments: the packet to be processed, in pckt_info;
1679  *
1680  * Side effects: may affect the event list, the cache and the message queue.
1681  *
1682  ******************************************************************************************************/
1683 #ifdef REFRESH
rmcastProcessRefreshPacket(PACKET_INFO pckt_info)1684 void rmcastProcessRefreshPacket( PACKET_INFO pckt_info )
1685 {
1686     int i, new_value_of_last_identified;
1687     CACHE *cache_sender_entry;
1688     cache_sender_entry = cacheLookForMember( &cache, &( pckt_info.sender_id ) );
1689 
1690     if( pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent < 0 )
1691         return;
1692 
1693     if( cache_sender_entry == NULL )
1694     {
1695         cache_sender_entry  = cacheInsertMember( &cache, &( pckt_info.sender_id ) );
1696     }
1697 
1698     new_value_of_last_identified = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent;
1699 
1700 #ifdef SINGLE_NACK
1701     if( new_value_of_last_identified >( cache_sender_entry->sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE ) )
1702     {
1703         /* Send NACKs that fit in the window */
1704         new_value_of_last_identified = cache_sender_entry->sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE;
1705     }
1706 
1707     for( i = new_value_of_last_identified;
1708          i > cache_sender_entry->sm_info.member_status.last_identified;
1709          i -- )
1710     {
1711         if( !rmcastFindEvent( &pckt_info,NAK_SND_WAIT ) )
1712         {
1713             if( !rmcastFindEvent( &pckt_info,RET_RCV_WAIT ) )
1714             {
1715                 pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent = i;
1716                 rmcastInsertEvent( &pckt_info,NAK_SND_WAIT );
1717 #ifdef DSINGLE_NACK
1718                 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: NAK_SND_WAIT insertion caused by refresh message\n" );
1719                 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: host:pid:sn=%s:%d:%d\n",
1720                                 pckt_info.sender_id.ip,pckt_info.sender_id.pid,i );
1721 #endif
1722             }
1723             else
1724             {
1725                 /* RET_RCV_WAIT exists, just set the i bit in the window_mask */
1726                 window_mask_set_bit( cache_sender_entry->sm_info.member_status.window_mask,
1727                                      i,
1728                                      &( cache_sender_entry->sm_info.member_status ),
1729                                      RET_RCV_WAIT );
1730 #ifdef DSINGLE_NACK
1731                 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: bit settled to RET_RCV_WAIT caused by refresh message\n" );
1732                 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: host:pid:sn=%s:%d:%d\n",
1733                                 pckt_info.sender_id.ip,pckt_info.sender_id.pid,i );
1734 #endif
1735             }
1736         }
1737         else
1738         {
1739             /* NAK_SND_WAIT exists, just set the i bit in the window_mask */
1740             window_mask_set_bit( cache_sender_entry->sm_info.member_status.window_mask,
1741                                  i,
1742                                  &( cache_sender_entry->sm_info.member_status ),
1743                                  NAK_SND_WAIT );
1744 #ifdef DSINGLE_NACK
1745             fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: bit settled to NAK_RCV_WAIT caused by refresh message\n" );
1746             fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: host:pid:sn=%s:%d:%d\n",
1747                             pckt_info.sender_id.ip,pckt_info.sender_id.pid,i );
1748 #endif
1749         }
1750     }
1751 #else
1752     for( i = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent;
1753          i > cache_sender_entry->sm_info.member_status.last_identified;
1754          i -- )
1755         {
1756            #ifdef DEBUG_EVENTS
1757               fprintf( stderr, "Will wait to send a nak, i: %d last received or nak: %d.\n", i,
1758                     cache_sender_entry->sm_info.member_status.last_identified );
1759            #endif
1760 
1761             pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent = i;
1762             rmcastInsertEvent( &pckt_info,NAK_SND_WAIT );
1763         }
1764 #endif /* SINGLE_NACK */
1765    if( cache_sender_entry->sm_info.member_status.last_identified < new_value_of_last_identified )
1766        cache_sender_entry->sm_info.member_status.last_identified = new_value_of_last_identified;
1767 }
1768 #endif
1769 
1770 /*****************************************************************************************************
1771  *
1772  * int rmcastRemoveEvent( PACKET_INFO *packt, int event_type )
1773  *
1774  * Removes an event from the event list.
1775  *
1776  * Arguments: the network packet, in packt;
1777  *            the event type, in event_type.
1778  *
1779  * Returns: 1, on sucess;
1780  *          0, otherwise.
1781  *
1782  * Side effects: it affects the event list.
1783  *
1784  ******************************************************************************************************/
rmcastRemoveEvent(PACKET_INFO * packt,int event_type)1785 int rmcastRemoveEvent( PACKET_INFO *packt, int event_type )
1786 {
1787     int retval;
1788 
1789     retval = 0;
1790 
1791     switch( packt->type )
1792     {
1793         case RETRANSM_PACKET_TYPE:
1794              retval = eventListRemove( &event_list, &( packt->packet_data.retransm_packet.original_sender_id ),
1795                                      event_type, packt->packet_data.retransm_packet.data_packet.sn );
1796              break;
1797 
1798         case DATA_PACKET_TYPE:
1799              retval = eventListRemove( &event_list, &( packt->sender_id ),
1800                                      event_type, packt->packet_data.data_packet.sn );
1801              break;
1802         case NAK_PACKET_TYPE:
1803              #ifdef DNAK_RCV
1804                  fprintf( stderr,  "rmcastRemoveEvent: calling eventListRemove\n" );
1805              #endif
1806              retval = eventListRemove( &event_list, &( packt->packet_data.nak_packet.requested_member_id ),
1807                                      event_type, packt->packet_data.nak_packet.sn );
1808              #ifdef DNAK_RCV
1809                  fprintf( stderr,  "rmcastRemoveEvent: after calling eventListRemove\n" );
1810              #endif
1811              break;
1812     }
1813     return  retval;
1814 }
1815 
1816 /*****************************************************************************************************
1817  *
1818  * int rmcastFindEvent( PACKET_INFO *packt, int event_type )
1819  *
1820  * Searches for an event in the event list.
1821  *
1822  * Arguments: the network packet, in packt;
1823  *            the event type, in event_type.
1824  *
1825  * Returns: 1, on sucess;
1826  *          0, otherwise.
1827  *
1828  ******************************************************************************************************/
rmcastFindEvent(PACKET_INFO * packt,int event_type)1829 int rmcastFindEvent( PACKET_INFO *packt, int event_type )
1830 {
1831     int retval;
1832 
1833     EVENT_LIST *aux;
1834 
1835     switch( packt->type )
1836     {
1837         case NAK_PACKET_TYPE:
1838              #ifdef DNAK_RCV
1839                  fprintf( stderr,  "rmcastFindEvent: calling eventlistFind to look for event:%d\n",event_type );
1840              #endif
1841 
1842              retval = (( eventListFind( &event_list, &( packt->packet_data.nak_packet.requested_member_id ),
1843                                       event_type, packt->packet_data.nak_packet.sn,&aux ) )!= NULL );
1844              #ifdef DNAK_RCV
1845                  fprintf( stderr,  "rmcastFindEvent: after calling eventlistFind to look for event:%d, retval=%d\n",event_type,retval );
1846              #endif
1847              break;
1848         case DATA_PACKET_TYPE:
1849              retval = (( eventListFind( &event_list, &( packt->sender_id ),
1850                                      event_type, packt->packet_data.data_packet.sn,&aux ) )!=NULL );
1851              break;
1852         case REFRESH_PACKET_TYPE:
1853              retval = (( eventListFind( &event_list, &( packt->sender_id ),
1854                                      event_type, packt->packet_data.refresh_packet.sn_of_last_msg_sent,&aux ) )!=NULL );
1855              break;
1856         default:
1857              fprintf( stderr, "rmcastFindEvent ERROR: Not yet implemented event=%d packet_type=%d.\n",event_type,packt->type );
1858              return 0;
1859     }
1860     return retval;
1861 }
1862 
1863 /*****************************************************************************************************
1864  *
1865  * int rmcastCacheContains( PACKET_INFO *packt )
1866  *
1867  * Searches for a packet in the cache.
1868  *
1869  * Arguments: the network packet, in packt;
1870  *
1871  * Returns: 1, on sucess;
1872  *          0, otherwise.
1873  *
1874  ******************************************************************************************************/
rmcastCacheContains(PACKET_INFO * packt)1875 int rmcastCacheContains( PACKET_INFO *packt )
1876 {
1877     int retval;
1878 
1879     retval = 0;
1880 
1881     switch( packt->type )
1882     {
1883         case NAK_PACKET_TYPE:
1884              retval = (( cacheLookForMessage( &cache, &( packt->packet_data.nak_packet.requested_member_id ),
1885                                            packt->packet_data.nak_packet.sn ) )!= NULL );
1886              break;
1887         case RETRANSM_PACKET_TYPE:
1888              retval = (( cacheLookForMessage( &cache, &( packt->packet_data.retransm_packet.original_sender_id ),
1889                                            packt->packet_data.retransm_packet.data_packet.sn ) )!= NULL );
1890              break;
1891         default:
1892              fprintf( stderr, "rmcastCacheContains: Unknown packet type:%d\n",packt->type );
1893              RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1894     }
1895     return retval;
1896 }
1897 
1898 /*****************************************************************************************************
1899  *
1900  * int rmcastInsertEvent( PACKET_INFO *packt, int event_type )
1901  *
1902  * Inserts an event in the event list.
1903  *
1904  * Arguments: the network packet, in packt;
1905  *            the event type, in event_type.
1906  *
1907  * Returns: 1, on sucess;
1908  *          0, otherwise.
1909  *
1910  * Side effects: it affects the event list.
1911  *
1912  ******************************************************************************************************/
rmcastInsertEvent(PACKET_INFO * packt,int event_type)1913 int rmcastInsertEvent( PACKET_INFO *packt, int event_type )
1914 {
1915     int retval;
1916     CACHE *cache_sender_entry;
1917 
1918     retval = 0;
1919 
1920     switch( packt->type )
1921     {
1922         case NAK_PACKET_TYPE:
1923              cache_sender_entry = cacheLookForMember( &cache,
1924                                          &( packt->packet_data.nak_packet.requested_member_id ) );
1925              #ifdef DNAK_RCV
1926                  fprintf( stderr,  "rmcastInsertEvent: calling eventListInsert\n" );
1927              #endif
1928 
1929              if( cache_sender_entry != NULL )
1930                  retval = eventListInsert( &event_list, &( cache_sender_entry->sm_info.member_id ),
1931                                          packt->sender_id.ip, event_type, ( packt->packet_data.nak_packet.sn ) );
1932              #ifdef DNAK_RCV
1933                  fprintf( stderr,  "rmcastInsertEvent: after calling eventListInsert cache_sender_entry:%p\n",cache_sender_entry );
1934              #endif
1935 
1936              break;
1937         case DATA_PACKET_TYPE:
1938              cache_sender_entry = cacheLookForMember( &cache, &( packt->sender_id ) );
1939              if( cache_sender_entry!=NULL )
1940                  retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ),
1941                                       packt->sender_id.ip, event_type, packt->packet_data.data_packet.sn );
1942              break;
1943         case REFRESH_PACKET_TYPE:
1944              cache_sender_entry = cacheLookForMember( &cache, &( packt->sender_id ) );
1945 
1946              if( cache_sender_entry!=NULL )
1947                  retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ),
1948                                       packt->sender_id.ip,event_type, packt->packet_data.refresh_packet.sn_of_last_msg_sent );
1949 
1950              break;
1951         default:
1952              fprintf( stderr, "rmcastInsertEvent: Unknown event type:%d\n",packt->type );
1953              RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1954 
1955     }
1956 #ifdef DNAK_RCV
1957     if( event_type == NAK_PACKET_TYPE )
1958     {
1959         fprintf( stderr, "rmcastInsertEvent: Show cache after NACK event insertion\n" );
1960         cacheShow( cache );
1961     }
1962 #endif
1963 
1964     return retval;
1965 }
1966 
1967 /**********End of the Routines to facilitate the interface with the event list and the cache ******/
1968 #endif
1969