1 /*
2 * Copyright (C) 2002-2009, Edmundo Albuquerque de Souza e Silva.
3 *
4 * This file may be distributed under the terms of the Q Public License
5 * as defined by Trolltech AS of Norway and appearing in the file
6 * LICENSE.QPL included in the packaging of this file.
7 *
8 * THIS FILE IS PROVIDED AS IS WITH NO WARRANTY OF ANY KIND, INCLUDING
9 * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
10 * PURPOSE. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
11 * INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
12 * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
13 * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
14 * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 *
16 */
17
18 /***************************************************************************
19 rminternals.c
20 -------------------
21 begin : May 2001
22 Authors : Jorge Allyson Azevedo
23 Milena Scanferla
24 Daniel Sadoc
25 email : {allyson,milena,sadoc}@land.ufrj.br
26 ***************************************************************************/
27
28 #ifndef RMINTERNALS_C
29 #define RMINTERNALS_C
30
31 #include "rminternals.h"
32 #include "rmcast.h"
33
34 #include <pthread.h>
35 #include <string.h>
36 #include <signal.h>
37 #include <errno.h>
38
39 #include <sys/time.h>
40 #include <unistd.h>
41 #include <stdio.h>
42 #include <time.h>
43
44 #include "rmwinmask.h"
45
46 extern pthread_t rcv_thread, signal_handler_thread;
47 extern char *EVENT_ACTION_NAME[6];
48 extern char *MESSAGE_TYPE_NAME[NUMBER_OF_MESSAGE_TYPES];
49 FILE *logfile=NULL;
50
51 #ifdef DSUPPRESSION
52 extern FILE *suppressionfile;
53 #endif
54
55 int STOP_TRANSMISSION=0;
56
57 EVENT_LIST *event_list=NULL;
58
59 #ifdef SINGLE_NACK
60
61 EVENT_LIST *nak_event=NULL;
62 EVENT_LIST *wait_for_retransm_event=NULL;
63
64 #endif
65
66 CACHE *cache=NULL;
67 USER_INFO local_user_info;
68 sigset_t alrmset;
69
70 GLOBAL_OPTIONS rmcast_options = {
71 /* int *pipe; */
72 0 ,
73 /* char tcp_ip[IP_STRING_SIZE]; */
74 { 0 },
75 /* int tcp_port; */
76 10013,
77 /* void ( *shut_down_routine )( void ) = NULL; */
78 0,
79 /* int cur_state_server_is_up = 0; */
80 0,
81 /* char dest_ip[IP_STRING_SIZE]; */
82 { "225.1.2.3" } ,
83 /* int dest_port; */
84 5151,
85 /* int ttl; */
86 1,
87 /* unsigned int microsleep; */
88 10,
89 /* int hosts_identified; default 0*/
90 0,
91 /* int timer_distribution; */
92 UNIFORM,
93 /* int max_nak; default 100 */
94 100,
95 /* int max_member_cache_size; default 4000 */
96 4000,
97 /* int version; */
98 1,
99 /* int transmission_mode; */
100 MULTICAST,
101 /* char log_file[255]; */
102 "NULL",
103 /* int new_member_support; */
104 0,
105 /* int statistics; */
106 0,
107 /* int refresh_timer; */
108 10,
109 /* float loss_prob; */
110 0,
111 /* int leave_group_wait_time; */
112 500000,
113 /* int rcv_buffer_size; */
114 10000,
115 /* HOSTS_DELAYS hosts_delays */
116 {
117 { "DEFAULT", /* host IP */
118 50000 /* one-way delay to host( microseconds ) */
119 }
120 },
121 /* parameters to obtain the timer interval */
122 2,
123 2,
124 5,
125 2,
126 2,
127 2
128 };
129
130 extern pthread_mutex_t change_local_user_sn;
131 extern pthread_mutex_t event_list_mutex;
132 extern pthread_mutex_t cache_mutex;
133
134 /*********************** Main routines ************************************************************/
135
lock_eventlist_and_cache()136 void lock_eventlist_and_cache( )
137 {
138 pthread_mutex_lock( &event_list_mutex );
139 pthread_mutex_lock( &cache_mutex );
140 }
141
unlock_eventlist_and_cache()142 void unlock_eventlist_and_cache( )
143 {
144 pthread_mutex_unlock( &cache_mutex );
145 pthread_mutex_unlock( &event_list_mutex );
146 }
147
148
149 /*****************************************************************************************************
150 *
151 * int rmcastCreateSocket ( int port )
152 *
153 * Creates a multi-cast socket, using the specified port.
154 *
155 * Returns the socket handler.
156 *
157 ******************************************************************************************************/
rmcastCreateSocket(int port)158 int rmcastCreateSocket( int port )
159 {
160
161 int aux = 1 ;
162 struct sockaddr_in saddr;
163
164 local_user_info.socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
165
166 bzero(( char* ) &saddr, sizeof( saddr ) );
167
168 saddr.sin_family = AF_INET;
169 saddr.sin_port = htons( port );
170 saddr.sin_addr.s_addr = htonl( INADDR_ANY );
171
172 if( setsockopt( local_user_info.socket, SOL_SOCKET, SO_REUSEADDR, ( char* )&aux, sizeof( aux ) ) == -1 )
173 perror( "reuse addr error" );
174
175 if( bind( local_user_info.socket, ( struct sockaddr * )&saddr, sizeof( saddr ) ) == -1 )
176 {
177 perror( "binding error" );
178 close( local_user_info.socket );
179 local_user_info.socket = -1;
180 }
181
182 if( rmcast_options.transmission_mode == MULTICAST )
183 {
184 rmcastSetTTLValue( local_user_info.socket, rmcast_options.ttl );
185 rmcastSetLoopBack( local_user_info.socket, 1 );
186 }
187
188 return( local_user_info.socket );
189 }
190
rmcastSetTTLValue(int soc,int ttl)191 void rmcastSetTTLValue( int soc, int ttl )
192 {
193 #ifndef SOLARIS
194 if( setsockopt( soc, IPPROTO_IP, IP_MULTICAST_TTL, ( char* )&ttl, sizeof( ttl ) ) ==-1 )
195 perror( "ttl" );
196 #endif
197 }
198
rmcastSetLoopBack(int soc,int loop)199 void rmcastSetLoopBack( int soc, int loop )
200 {
201 #ifndef SOLARIS
202 if( setsockopt( soc, IPPROTO_IP, IP_MULTICAST_LOOP, ( char* )&loop, sizeof( loop ) ) ==-1 )
203 perror( "loop" );
204 #endif
205 }
206
207 /*****************************************************************************************************
208 *
209 * void rmcastJoinGroup ( int soc, char *group_addr )
210 *
211 * Joins the user using the socket soc to the multicast group.
212 *
213 * Arguments: the socket, in soc;
214 * the multicast group ip, in group_addr
215 *
216 ******************************************************************************************************/
rmcastJoinGroup(int soc,char * group_addr)217 void rmcastJoinGroup ( int soc, char *group_addr )
218 {
219 struct sockaddr_in group;
220 struct ip_mreq mreq;
221
222 if( rmcast_options.transmission_mode == MULTICAST )
223 {
224 fprintf( stdout,"rmcastJoinGroup: Joining group %s\n", group_addr );
225
226 if(( group.sin_addr.s_addr = inet_addr( group_addr ) ) == ( unsigned )-1 )
227 perror( "rmcastJoingGroup: inet_addr" );
228
229 mreq.imr_multiaddr = group.sin_addr;
230 mreq.imr_interface.s_addr = INADDR_ANY;
231
232 if( setsockopt( soc, IPPROTO_IP, IP_ADD_MEMBERSHIP, ( char * )&mreq, sizeof( mreq ) ) == -1 )
233 perror( "rmcastJoingGroup: ADD_MEMBERSHIP" );
234 }
235 }
236
237 /*****************************************************************************************************
238 *
239 * void rmcastLeaveGroup ( int soc, char *group_addr )
240 *
241 * Used when the user using the socket soc wants to leave the multicast group.
242 *
243 * Arguments: the socket, in soc;
244 * the multicast group ip, in group_addr
245 * Returns: 1, on success;
246 * 0, on error.
247 *
248 ******************************************************************************************************/
rmcastLeaveGroup(int soc,char * group_addr)249 int rmcastLeaveGroup ( int soc, char *group_addr )
250 {
251 BYTE *buffer;
252 int buffsize = 0;
253
254 PACKET_INFO pckt_info;
255
256 #ifdef REFRESH
257 /* Mounting and sending a Refresh packet before leaving the group but only
258 if we have already sent one data packet */
259 if( local_user_info.sn > 0 ){
260
261 pckt_info.type = REFRESH_PACKET_TYPE;
262 pckt_info.flags = 0;
263 pckt_info.sender_id = local_user_info.member_id;
264 pckt_info.packet_size = sizeof( REFRESH_PACKET );
265
266 pthread_mutex_lock( &change_local_user_sn );
267 pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent = local_user_info.sn - 1;
268 pthread_mutex_unlock( &change_local_user_sn );
269
270 #ifdef DEBUG_EVENTS
271 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Mounting refresh packet.\n" );
272 #endif
273 msgPcktMountMessage( &pckt_info, ( BYTE** )&buffer, &buffsize );
274 if ( rmcastSendPacket( soc, buffer, buffsize ) == 0 )
275 return 0;
276 }
277 #endif /* REFRESH */
278
279 /* Mounting and sending a Leave group packet */
280 pckt_info.type = LEAVE_GROUP_PACKET_TYPE;
281 pckt_info.flags = 0;
282 pckt_info.sender_id = local_user_info.member_id;
283 pckt_info.packet_size = 0;
284
285 #ifdef DEBUG_NET_MSGS
286 fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: mounting message\n" );
287 #endif
288
289 msgPcktMountMessage( &pckt_info, ( BYTE** )&buffer, &buffsize );
290
291 #ifdef DEBUG_NET_MSGS
292 fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: mounted 'leave group' message\n" );
293 #endif
294
295 if ( rmcastSendPacket( soc, buffer, buffsize ) == 0 )
296 return 0;
297
298 #ifdef DEBUG_NET_MSGS
299 fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: sent 'leave group' message\n" );
300 #endif
301
302 #ifdef DEBUG_NET_MSGS
303 fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: Disconnecting... %s\n", group_addr );
304 #endif
305
306 lock_eventlist_and_cache( );
307 eventListInsert( &event_list,NULL, NULL, LEV_GRP_WAIT, -1 );
308 unlock_eventlist_and_cache( );
309
310 #ifdef DEBUG_NET_MSGS
311 fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: returning\n" );
312 #endif
313
314 return 1;
315 }
316
317 /*****************************************************************************************************
318 *
319 * void showLogMsg( char type, PACKET_INFO *pckt_info )
320 *
321 * Prints a message from type 'type', related to the packet 'pckt_info', in the log file 'logfile'.
322 *
323 * Arguments: the message type and the packet information.
324 *
325 * Side effects: changes the log file.
326 *
327 ******************************************************************************************************/
showLogMsg(char type,PACKET_INFO * pckt_info)328 void showLogMsg( char type, PACKET_INFO *pckt_info )
329 {
330 struct timeval current_time;
331 char buffer[200];
332 struct tm time;
333 #ifdef DSHOW_COMPLETE_MASK_IN_LOG
334 char auxbuffer[100];
335 int n = 0;
336 #endif
337
338 memset( buffer,0,sizeof( buffer ) );
339
340 if( logfile!= NULL )
341 {
342 gettimeofday( ¤t_time, 0 );
343 time = *gmtime(( const time_t* )¤t_time );
344 current_time. tv_sec = (( time.tm_hour * 60 + time.tm_min ) * 60 ) + time.tm_sec;
345
346 if( pckt_info !=NULL )
347 {
348 switch( pckt_info->type )
349 {
350 /*
351 ---------------------------------------------------------------------------------------------------------------------------
352 Time Snd/Rcv/Loss type sender_ip sender_pid requested_ip requested_pid sn [{base sn} {win size} {hmask} {lmask}]
353 ---------------------------------------------------------------------------------------------------------------------------
354 01234567890 x yz abc.def.ghi.jkl 1234567890 abc.def.ghi.jkl 1234567890 123456 123456 1234 1234 1234
355 */
356 case DATA_PACKET_TYPE:
357
358 sprintf( buffer,"%15s %10d %6d",
359 pckt_info->sender_id.ip,pckt_info->sender_id.pid, pckt_info->packet_data.data_packet.sn );
360
361 break;
362
363 case NAK_PACKET_TYPE:
364 #ifndef SINGLE_NACK
365 sprintf( buffer,"%15s %10d %15s %10d %6d",
366 pckt_info->sender_id.ip,
367 pckt_info->sender_id.pid,
368 pckt_info->packet_data.nak_packet.requested_member_id.ip,
369 pckt_info->packet_data.nak_packet.requested_member_id.pid,
370 pckt_info->packet_data.nak_packet.sn
371 );
372 #else
373 sprintf( buffer,"%15s %10d %15s %10d %6d %4d %4u %4u ",
374 pckt_info->sender_id.ip,
375 pckt_info->sender_id.pid,
376 pckt_info->packet_data.nak_packet.requested_member_id.ip,
377 pckt_info->packet_data.nak_packet.requested_member_id.pid,
378 pckt_info->packet_data.nak_packet.base_sn,
379 pckt_info->packet_data.nak_packet.window_size,
380 pckt_info->packet_data.nak_packet.hmask,
381 pckt_info->packet_data.nak_packet.lmask );
382 #ifdef DSHOW_COMPLETE_MASK_IN_LOG
383 for( n= ( pckt_info->packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
384 {
385 sprintf( auxbuffer,"%d", (( unsigned int )pckt_info->packet_data.nak_packet.hmask >> n ) & 1 );
386 strcat( buffer,auxbuffer );
387 }
388
389 sprintf( auxbuffer,"-" );
390 strcat( buffer,auxbuffer );
391
392 for( n= ( pckt_info->packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
393 {
394 sprintf( auxbuffer,"%d", (( unsigned int )pckt_info->packet_data.nak_packet.lmask >> n ) & 1 );
395 strcat( buffer,auxbuffer );
396 }
397 #endif /*DSHOW_COMPLETE_MASK_IN_LOG */
398
399 #endif
400 break;
401
402 case RETRANSM_PACKET_TYPE:
403 sprintf( buffer,"%15s %10d %15s %10d %6d",
404 pckt_info->sender_id.ip,
405 pckt_info->sender_id.pid,
406 pckt_info->packet_data.retransm_packet.original_sender_id.ip,
407 pckt_info->packet_data.retransm_packet.original_sender_id.pid,
408 pckt_info->packet_data.retransm_packet.data_packet.sn );
409 break;
410
411 #ifdef REFRESH
412 case REFRESH_PACKET_TYPE:
413 sprintf( buffer,"%15s %10d %6d",
414 pckt_info->sender_id.ip,pckt_info->sender_id.pid,
415 pckt_info->packet_data.refresh_packet.sn_of_last_msg_sent );
416 break;
417 #endif
418 case JOIN_REQUEST_PACKET_TYPE:
419 sprintf( buffer,"%15s %10d",pckt_info->sender_id.ip,pckt_info->sender_id.pid );
420 break;
421
422 case JOIN_ACCEPT_PACKET_TYPE:
423 sprintf( buffer,"%15s %10d",pckt_info->sender_id.ip,pckt_info->sender_id.pid );
424 break;
425
426 case LEAVE_GROUP_PACKET_TYPE:
427 sprintf( buffer,"%15s %10d",pckt_info->sender_id.ip,pckt_info->sender_id.pid );
428 break;
429
430 default:
431 fprintf( stderr, "Unknown packet type: %c\n",pckt_info->type );
432 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
433 }
434
435 fprintf( logfile,"%.0Lf %c %s %s\n",
436 (((( long double )current_time.tv_sec )*1000000+( long double )current_time.tv_usec ) ) ,
437 type,
438 MESSAGE_TYPE_NAME[( int )pckt_info->type],
439 buffer );
440 }
441 else
442 {
443 fprintf( logfile,"%.0Lf %c\n",
444 (((( long double )current_time.tv_sec )*1000000+( long double )current_time.tv_usec ) ) ,
445 type );
446 }
447 }
448 }
449
450 /*****************************************************************************************************
451 *
452 * void* rmcastReceivePackets( void *user_soc )
453 *
454 * Receives packets from socket soc.
455 *
456 * Arguments: the socket number, in soc;
457 *
458 * Side effects: may change the state of the event list or cache.
459 *
460 ******************************************************************************************************/
rmcastReceivePackets(void * user_soc)461 void *rmcastReceivePackets( void *user_soc )
462 {
463 int soc;
464 struct sockaddr_in sender;
465 static int first;
466 static int first_pckts_lost;
467 socklen_t len;
468 int sn;
469 int nbytes; /* number of bytes received */
470 char message[rmcast_options.rcv_buffer_size];
471 PACKET_INFO pckt_info;
472 MEMBER_ID aux_member_id;
473 #ifdef DPROCESSING_TIME
474 struct timeval time_before,time_after;
475 struct tm time;
476 FILE *proc_log;
477 char tipo;
478 #endif
479
480 soc = ( int )user_soc;
481 sn = 0;
482 first = 1;
483 first_pckts_lost = 10;
484
485 #ifndef SOLARIS
486 sigset_t alrmset;
487 sigfillset( &alrmset ); /* fills the alarm set with all the signals */
488
489 /* blocks all the signals, because they won�t be handled by this thread */
490 pthread_sigmask( SIG_BLOCK, &alrmset, NULL );
491 #endif
492 #ifdef DPROCESSING_TIME
493 proc_log=fopen( "proc_times.log","w" );
494 #endif
495 do{
496 bzero( message, rmcast_options.rcv_buffer_size );
497
498 #ifdef DEBUG_NET_MSGS
499 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: Waiting to receive a message...\n" );
500 #endif
501 len = sizeof( struct sockaddr_in );
502 nbytes = ( int )recvfrom( soc, message, rmcast_options.rcv_buffer_size, 0, ( struct sockaddr * ) &sender, &len );
503
504 #ifdef DEBUG_NET_MSGS
505 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: A message was received from the network!( message size=%d )\n",nbytes );
506 #endif
507
508 if( nbytes <= 0 )
509 {
510 fprintf( stderr, "rmcastReceivePackets: Error while receiving a message from the net - nbytes=%d \n",nbytes );
511 }
512 else /* nbytes > 0 */
513 {
514 #ifdef DEBUG_PCKT
515 {
516 int cont;
517 fprintf( stderr, "DEBUG_PCKT rmcastReceivePackets: Received message:\n" );
518 for( cont = 0; cont < nbytes; cont++ )
519 {
520 fprintf( stderr, "%d", ( ( signed char * )message )[cont] );
521 }
522 fprintf( stderr, "\n" );
523 }
524 #endif
525 msgPcktUnmountMessage( &pckt_info, ( BYTE* )message );
526
527 #ifdef DPROCESSING_TIME
528 gettimeofday( &time_before, 0 );
529 time = *gmtime(( const time_t* )&time_before );
530 time_before.tv_sec = (( time.tm_hour * 60 + time.tm_min ) * 60 ) + time.tm_sec;
531 tipo = pckt_info.type;
532
533 #endif
534
535 if( memberIsEqual( &( pckt_info.sender_id ), &( local_user_info.member_id ) ) )
536 {
537 /* We have received our own packet, so we just log and go to the next packet */
538 showLogMsg( RECEIVE,&pckt_info );
539 continue;
540 }
541
542
543 #ifdef DEBUG_NET_MSGS
544 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: message type=%s\n",MESSAGE_TYPE_NAME[( int )( pckt_info.type )] );
545 #endif
546
547 switch( pckt_info.type )
548 {
549 case DATA_PACKET_TYPE:
550 sn = pckt_info.packet_data.data_packet.sn; break;
551 case NAK_PACKET_TYPE:
552 sn = pckt_info.packet_data.nak_packet.sn;
553 #ifdef DSINGLE_NACK
554
555 fprintf( stderr, "NAK received:\n\tFrom:%s:%d \n\tRequesting: %s:%d:%d \n\twin_size:%d \tbase_sn:%d \thmask:%u \tlmask:%u \n",
556 pckt_info.sender_id.ip,
557 pckt_info.sender_id.pid,
558 pckt_info.packet_data.nak_packet.requested_member_id.ip,
559 pckt_info.packet_data.nak_packet.requested_member_id.pid,
560 pckt_info.packet_data.nak_packet.sn,
561 pckt_info.packet_data.nak_packet.window_size,
562 pckt_info.packet_data.nak_packet.base_sn,
563 ( unsigned int )pckt_info.packet_data.nak_packet.hmask,
564 ( unsigned int )pckt_info.packet_data.nak_packet.lmask );
565 {
566
567 int n;
568
569 fprintf( stderr, "\tHMASK:" );
570 for( n = ( pckt_info.packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
571 {
572 fprintf( stderr, "%d", (( unsigned int )pckt_info.packet_data.nak_packet.hmask >> n ) & 1 );
573 }
574
575 fprintf( stderr, "\n\tLMASK:" );
576
577 for( n = ( pckt_info.packet_data.nak_packet.window_size / 2 )-1; n>=0; n-- )
578 {
579 fprintf( stderr, "%d", (( unsigned int )pckt_info.packet_data.nak_packet.lmask >> n ) & 1 );
580 }
581
582 fprintf( stderr, "\n" );
583 }
584
585
586 #endif
587 break;
588 case RETRANSM_PACKET_TYPE:
589 sn = pckt_info.packet_data.retransm_packet.data_packet.sn; break;
590 case REFRESH_PACKET_TYPE:
591 sn = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent; break;
592 case JOIN_REQUEST_PACKET_TYPE:
593 sn = -1; break;
594 case JOIN_ACCEPT_PACKET_TYPE:
595 sn = -1; break;
596 case LEAVE_GROUP_PACKET_TYPE:
597 sn = -1; break;
598 default:
599 fprintf( stderr, "rmcastReceivePackets: Unknown packet type.\n" );
600 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
601 }
602 #ifdef DEBUG_NET_MSGS
603 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: sn=%d\n",sn );
604 #endif
605
606 #ifdef DEBUG_SHOW
607 msgPcktShowMessage(( BYTE * )message );
608 #endif
609
610 /* ------------------------ Loss Simulation begin----------------------------------------- */
611 if( rmcast_options.loss_prob > 0 )
612 {
613 /* Uncomment the following line if don't want naks and retransmission to be lost
614
615 if( pckt_info.type != NAK_PACKET_TYPE && pckt_info.type != RETRANSM_PACKET_TYPE ) */
616 {
617 if( generateSample( UNIFORM ) < rmcast_options.loss_prob ||
618 (( pckt_info.type == DATA_PACKET_TYPE ) &&( sn == 0 ) &&( first == 1 ) ) ||
619 ( first_pckts_lost > 0 )
620 )
621 /*
622 * We are always losing the first 'first_pckts_lost'( e.g., 10 ) received packets,
623 * and the first received data packet -
624 * that is, the first burst of received packets.
625 * After that, packets are lost according to the rmcast_options.loss_prob.
626 */
627 {
628 #ifdef DEBUG_NET_MSGS
629 fprintf( stderr, "\nDEBUG_NET_MSGS rmcastReceivePackets: PACKET LOST sn=%d\n\n",sn );
630 #endif
631 if( logfile != NULL )
632 {
633 showLogMsg( RECEIVE_AND_LOSE,&pckt_info );
634 }
635
636 if( pckt_info.type == DATA_PACKET_TYPE )
637 first = 0;
638
639 if( first_pckts_lost>0 )
640 first_pckts_lost--;
641
642 continue;
643 }
644 }
645 }
646 /* ------------------------ Loss Simulation end----------------------------------------- */
647 lock_eventlist_and_cache( );
648 #ifdef DMUTEX
649 fprintf( stderr, "DMUTEX rmcastReceivePackets: event list and cache LOCKED\n" );
650 showLogMsg( 'K', NULL );
651 #endif
652 if( logfile != NULL )
653 {
654 showLogMsg( RECEIVE,&pckt_info );
655 }
656
657 switch( pckt_info.type )
658 {
659 case NAK_PACKET_TYPE:
660 #ifdef SINGLE_NACK
661 {
662 int i,aux_window_mask[MAX_WINDOW_SIZE];
663
664 memset( aux_window_mask, 0, ( MAX_WINDOW_SIZE*( sizeof( int ) ) ) );
665 window_mask_numbers2array( aux_window_mask, pckt_info.packet_data.nak_packet.hmask, pckt_info.packet_data.nak_packet.lmask );
666 #ifdef DEBUG_NET_MSGS
667 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received!\n" );
668 #endif
669 if( !memberIsEqual( &( pckt_info.packet_data.nak_packet.requested_member_id ),&( local_user_info.member_id ) ) )
670 {
671 for( i = 0; i < MAX_WINDOW_SIZE; i++ )
672 {
673 if( aux_window_mask[i] == 1 )
674 {
675 /* Setting the real value to the sn */
676
677 pckt_info.packet_data.nak_packet.sn = pckt_info.packet_data.nak_packet.base_sn + i + 1;
678 #ifdef DNAK_RCV
679 fprintf( stderr, "DNAK_RCV we will try to remove event NAK_SND_WAIT for sn=%d\n",pckt_info.packet_data.nak_packet.sn );
680 #endif
681
682 if( rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) )
683 {
684 #ifdef DEBUG_NET_MSGS
685 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_RCV_WAIT or a SUPPRESSED_NAK.\n" );
686 #endif
687
688 #ifdef DNAK_RCV
689 fprintf( stderr, "Inserting a SUPPRESSED_NAK\n" );
690 #endif
691 rmcastInsertEvent( &pckt_info, SUPPRESSED_NAK );
692 }
693 }
694 }
695 }
696 for( i = 0; i < MAX_WINDOW_SIZE; i++ )
697 {
698 if( aux_window_mask[i] == 1 )
699 {
700 /* Setting the real value to the sn */
701 pckt_info.packet_data.nak_packet.sn = pckt_info.packet_data.nak_packet.base_sn + i + 1;
702
703 #ifdef DNAK_RCV
704 fprintf( stderr, "DNAK_RCV rmcastReceivePackets: Before calling rmcastCacheContains sn=%d\n",pckt_info.packet_data.nak_packet.sn );
705 #endif
706
707 if( rmcastCacheContains( &pckt_info ) &&( rmcastFindEvent( &pckt_info,RET_SND_WAIT ) == 0 ) )
708 {
709 #ifdef DEBUG_NET_MSGS
710 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_SND_WAIT.\n" );
711 #endif
712
713 #ifdef DNAK_RCV
714 fprintf( stderr, "Inserting a RET_SND_WAIT: %s:%d:%d\n",
715 pckt_info.packet_data.nak_packet.requested_member_id.ip,
716 pckt_info.packet_data.nak_packet.requested_member_id.pid,
717 pckt_info.packet_data.nak_packet.sn );
718 #endif
719 rmcastInsertEvent( &pckt_info,RET_SND_WAIT );
720 }
721 }
722 }
723 }
724 #else /* using multiple nacks */
725 #ifdef DEBUG_NET_MSGS
726 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received!\n" );
727 #endif
728
729 if( !memberIsEqual( &( pckt_info.packet_data.nak_packet.requested_member_id ),&( local_user_info.member_id ) )
730 && rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) )
731 {
732 #ifdef DEBUG_NET_MSGS
733 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_RCV_WAIT.\n" );
734 #endif
735 rmcastInsertEvent( &pckt_info,RET_RCV_WAIT );
736 }
737 else if( !rmcastFindEvent( &pckt_info,RET_SND_WAIT ) )
738 {
739 if( rmcastCacheContains( &pckt_info ) )
740 {
741 #ifdef DEBUG_NET_MSGS
742 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: a NAK was received. inserting a RET_SND_WAIT.\n" );
743 #endif
744 rmcastInsertEvent( &pckt_info,RET_SND_WAIT );
745 }
746 }
747 #endif /* SINGLE_NACK */
748 break;
749
750 case LEAVE_GROUP_PACKET_TYPE:
751 /*
752 * FIXME
753 * By now it is OK if the member does not exist in cache,
754 * because he is leaving the group anyway :- ) So we don't
755 * care about the return value for cacheDeactivateMember
756 */
757 cacheDeactivateMember( cache,&( pckt_info.sender_id ) );
758 break;
759
760 case JOIN_REQUEST_PACKET_TYPE:
761 {
762 char *buffer;
763 int buffsize;
764
765 PACKET_INFO pckt_info;
766
767 if( rmcast_options.cur_state_server_is_up )
768 {
769 pckt_info.type = JOIN_ACCEPT_PACKET_TYPE;
770 pckt_info.flags = 0;
771 pckt_info.sender_id = local_user_info.member_id;
772 pckt_info.packet_size = sizeof( int );
773 pckt_info.packet_data.join_accept_packet.port = rmcast_options.tcp_port;
774 #ifdef DEBUG_NET_MSGS
775 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: mounting the 'join accept packet'( port: %d )...", rmcast_options.tcp_port );
776 #endif
777 msgPcktMountMessage( &pckt_info, ( BYTE** )&buffer, &buffsize );
778 #ifdef DEBUG_SHOW
779 fprintf( stderr, "DEBUG_SHOW rmcastReceivePackets: ok!\n" );
780 msgPcktShowMessage(( BYTE* )buffer );
781 #endif
782 #ifdef DEBUG_NET_MSGS
783 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: now, sending the message to the mcast group\n" );
784 #endif
785 rmcastSendPacket( local_user_info.socket, ( BYTE* )buffer, buffsize );
786 }
787 }
788 break;
789
790 case JOIN_ACCEPT_PACKET_TYPE:
791 /*
792 * Ignore this packet. He is handled only when we are entering the group.
793 * See RM_getCurState at rmcast.c
794 */
795 break;
796
797 case REFRESH_PACKET_TYPE:
798 #ifdef REFRESH
799 rmcastProcessRefreshPacket( pckt_info );
800 #endif
801 break;
802
803 case RETRANSM_PACKET_TYPE:
804
805 #ifdef DSUPPRESSION
806 if( rmcastRemoveEvent( &pckt_info,RET_SND_WAIT ) ){
807 fprintf( suppressionfile,"RT %s %d %s %d %d\n",
808 pckt_info.sender_id.ip,
809 pckt_info.sender_id.pid,
810 pckt_info.packet_data.retransm_packet.original_sender_id.ip,
811 pckt_info.packet_data.retransm_packet.original_sender_id.pid,
812 pckt_info.packet_data.retransm_packet.data_packet.sn );
813 }
814 #else
815 rmcastRemoveEvent( &pckt_info,RET_SND_WAIT );
816 #endif
817 if( !rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) &&
818 !rmcastRemoveEvent( &pckt_info,RET_RCV_WAIT ) )
819 {
820 if( rmcastCacheContains( &pckt_info ) )
821 break;
822 }
823
824 /* Now we will get the content of the retransmition and put it in the cache. */
825 strcpy( aux_member_id.ip, pckt_info.packet_data.retransm_packet.original_sender_id.ip );
826 aux_member_id.pid=pckt_info.packet_data.retransm_packet.original_sender_id.pid;
827 pckt_info.packet_data.data_packet = pckt_info.packet_data.retransm_packet.data_packet;
828 pckt_info.sender_id = aux_member_id;
829
830 /* continue... */
831 case DATA_PACKET_TYPE:
832 rmcastProcessDataPacket( pckt_info ); /* inserts data in cache... */
833
834 #ifdef DSINGLE_NACK
835 fprintf( stderr, "DSINGLE_NACK rmcastReceivePackets: after executing rmcastProcessDataPacket.\n" );
836 eventListShow( event_list );
837 #endif
838 break;
839
840 default:
841 fprintf( stderr, "rmcastReceivePackets: Invalid message type.( %d )\n",pckt_info.type );
842 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
843
844 } /* End switch */
845 unlock_eventlist_and_cache( );
846 #ifdef DMUTEX
847 showLogMsg( 'U', NULL );
848 fprintf( stderr, "DMUTEX rmcastReceivePackets: event list and cache UNLOCKED\n" );
849 #endif
850 } /* nbytes > 0 */
851
852 #ifdef DEBUG_NET_MSGS
853 fprintf( stderr, "DEBUG_NET_MSGS rmcastReceivePackets: received, ok!\n" );
854 #endif
855 #ifdef DPROCESSING_TIME
856 gettimeofday( &time_after, 0 );
857 time = *gmtime(( const time_t* )&time_after );
858 time_after.tv_sec = (( time.tm_hour * 60 + time.tm_min ) * 60 ) + time.tm_sec;
859
860 time_after.tv_sec = time_after.tv_sec - time_before.tv_sec;
861 time_after.tv_usec = time_after.tv_usec - time_before.tv_usec;
862
863 fprintf( proc_log,"%d %.0Lf\n",tipo, (( long double )time_after.tv_sec*1000000+( long double )time_after.tv_usec ) );
864 #endif
865
866 } FOREVER;
867
868 return NULL;
869 }
870
871 /*****************************************************************************************************
872 *
873 * void rmcastSendPacket( int soc, BYTE *message, int message_size )
874 *
875 * Sends a message packet of size message_size to socket soc.
876 *
877 * Returns: 1, on success;
878 * 0, on error.
879 *
880 ******************************************************************************************************/
rmcastSendPacket(int soc,BYTE * message,int message_size)881 int rmcastSendPacket( int soc, BYTE *message, int message_size )
882 {
883 PACKET_INFO pckt_info;
884 struct sockaddr_in dest;
885 int sn;
886
887 #ifdef DEBUG_SHOW2
888 int i;
889 #endif
890
891 int retval;
892 char type = *(( char* )message );
893 char local_message[ message_size + 1 ];
894 struct timespec sleep_time;
895
896 #ifdef DEBUG_SHOW2
897 fprintf( stderr, "DEBUG_SHOW2 - rmcastSendPacket: will copy a message of size %d to local var.\n",message_size );
898 for( i=0; i< message_size; i++ )
899 {
900 fprintf( stderr, "( %d )%d",i, ( unsigned int )message[i] );
901 }
902 fprintf( stderr, "\n" );
903 #endif
904
905 memcpy( local_message, message, message_size );
906 dest.sin_family = AF_INET;
907 dest.sin_port = htons ( local_user_info.group_id.port );
908 dest.sin_addr.s_addr = inet_addr( local_user_info.group_id.ip );
909
910 if( type == DATA_PACKET_TYPE )
911 {
912 /* REFRESH event is now inserted only after user sends the first packet, i.e., local_user_info.sn==0 */
913 if( local_user_info.sn == 0 )
914 {
915 eventListInsert( &event_list, &( local_user_info.member_id ), NULL, REF_SND_WAIT, -1 );
916 }
917 pthread_mutex_lock( &change_local_user_sn );
918 local_user_info.sn++;
919 pthread_mutex_unlock( &change_local_user_sn );
920 }
921
922 msgPcktUnmountMessage( &pckt_info, ( BYTE * )local_message );
923
924 switch( pckt_info.type )
925 {
926 case DATA_PACKET_TYPE:
927 sn = pckt_info.packet_data.data_packet.sn; break;
928 case NAK_PACKET_TYPE:
929 sn = pckt_info.packet_data.nak_packet.sn; break;
930 case RETRANSM_PACKET_TYPE:
931 sn = pckt_info.packet_data.retransm_packet.data_packet.sn; break;
932 case REFRESH_PACKET_TYPE:
933 sn = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent; break;
934 case JOIN_REQUEST_PACKET_TYPE:
935 sn = -1; break;
936 case JOIN_ACCEPT_PACKET_TYPE:
937 sn = -1; break;
938 default:
939 sn = -1;
940 }
941 #ifdef DEBUG_NET_MSGS
942 fprintf( stderr, "DEBUG_NET_MSGS rmcastSendPacket: will send a %s - sn: %d\n", MESSAGE_TYPE_NAME[( int )type],sn );
943 #endif
944
945 if( logfile != NULL )
946 {
947 showLogMsg( SEND,&pckt_info );
948 }
949
950 retval = sendto( soc, local_message, message_size, 0, ( struct sockaddr * )&dest, sizeof( dest ) );
951
952 if( retval < 0 )
953 {
954 fprintf( stderr, "ERROR - rmcastSendPacket: retval < 0\n" );
955 perror( "send_message" );
956
957 msgPcktShowMessage( message );
958
959 return 0;
960 }
961 /* fprintf( stderr, " Packet size RML: %d\n",message_size ); FIXME teste pra saber o tamanho dos pacotes */
962
963 if( ( rmcast_options.microsleep > 0 ) &&( rmcast_options.microsleep < 65536 ) )
964 {
965 /* Converting rmcast_options.microsleep to nanoseconds as needed by nanosleep function */
966 sleep_time.tv_nsec= ( rmcast_options.microsleep*1000 );
967 sleep_time.tv_sec=0;
968 nanosleep( &sleep_time,NULL );
969 /* usleep( rmcast_options.microsleep ); */
970 }
971 return 1;
972 }
973
974 /*****************************************************************************************************
975 *
976 * void *rmcastHandleSignals( void *arg )
977 *
978 * This thread will be the only one that threats the alarm events.
979 *
980 * Communication with it should be done using SIGUSR1.
981 *
982 *****************************************************************************************************/
983 extern const char * const sys_siglist[];
984 #define _GNU_SOURCE
985 #include <string.h>
986 char *strsignal( int sig );
987 extern const char * const sys_siglist[];
988
rmcastHandleSignals(void * arg)989 void *rmcastHandleSignals( void *arg )
990 {
991 int sig;
992 int err;
993 int finish = 0;
994
995 #ifndef SOLARIS
996 struct itimerval value;
997 #else
998 sigset_t alrmset;
999 sigfillset( &alrmset );
1000 pthread_sigmask( SIG_BLOCK, &alrmset, NULL );
1001 #endif
1002
1003 while( 1 )
1004 {
1005 #ifdef DEBUG_EVENTS
1006 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: Before sigwait( )\n" );
1007 #endif
1008 err = sigwait( &alrmset, &sig );
1009 #ifdef DEBUG_EVENTS
1010 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: After sigwait( ) \n" );
1011 #endif
1012 #ifdef DEBUG_EVENTS
1013 #ifdef SOLARIS
1014 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals( SOLARIS ): signal ( %d ) received\n", sig );
1015 #else
1016 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: signal %s( %d ) received\n",sys_siglist[sig], sig );
1017 #endif
1018 #endif
1019 if( err || ( sig != SIGALRM && sig != SIGUSR1 ) )
1020 {
1021 fprintf( stderr, "rmcastHandleSignals: [%s signal catched]\n",strsignal( sig ) );
1022 if( sig == SIGINT || sig == SIGTERM )
1023 {
1024 fprintf( stderr, "rmcastHandleSignals: Signal %s received. Exiting.\n",strsignal( sig ) );
1025 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1026 }
1027 else if( sig == SIGTSTP )
1028 {
1029 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: SIGSTP received. Nothing to be done.\n" );
1030 /* FIXME( in a future release, we may be able to handle the SIGTSTP signal ) */
1031 }
1032 else if( sig == SIGUSR2 )
1033 {
1034 STOP_TRANSMISSION = 1;
1035 }
1036 else continue;
1037 }
1038 lock_eventlist_and_cache( );
1039
1040 #ifdef DMUTEX
1041 showLogMsg( 'k', NULL );
1042 fprintf( stderr, "DMUTEX rmcastHandleSignals: event list and cache LOCKED\n" );
1043 #endif
1044 if( sig == SIGALRM ||( event_list != NULL && event_list->timer_value == 0 ) )
1045 {
1046 #ifdef DSINGLE_NACK
1047 fprintf( stderr, "DSINGLE_NACK rmcastHandleSignals: calling rmcastCatchEvents\n" );
1048 #endif
1049 finish = rmcastCatchEvents( 0 );
1050
1051 #ifdef DSINGLE_NACK
1052 fprintf( stderr, "DSINGLE_NACK rmcastHandleSignals: back from rmcastCatchEvents\n" );
1053 #endif
1054 if( finish )
1055 break;
1056 }
1057 #ifdef DSINGLE_NACK
1058 fprintf( stderr, "DSINGLE_NACK rmcastHandleSignals: showing eventlist:\n" );
1059 eventListShow( event_list );
1060 #endif
1061
1062 if( event_list != NULL )
1063 {
1064 #ifdef DEBUG_EVENTS
1065 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: setting timer to %ld( rmcastHandleSignals )\n",event_list->timer_value );
1066 #endif
1067
1068 #ifndef SOLARIS
1069 setTimerValue( &value,event_list->timer_value );
1070 setitimer( ITIMER_REAL, &value,NULL );
1071 #else
1072 alarm( event_list->timer_value/1000000 + 1 );
1073 #endif
1074 }
1075 else
1076 {
1077 #ifdef DEBUG_EVENTS
1078 fprintf( stderr, "DEBUG_EVENTS rmcastHandleSignals: resetting timer( rmcastHandleSignals )\n" );
1079 #endif
1080
1081 #ifndef SOLARIS
1082 setTimerValue( &value,0 );
1083 setitimer( ITIMER_REAL,&value,NULL );
1084 #else
1085 alarm( 0 );
1086 #endif
1087 }
1088 unlock_eventlist_and_cache( );
1089 #ifdef DMUTEX
1090 showLogMsg( 'u', NULL );
1091 fprintf( stderr, "DMUTEX rmcastHandleSignals: cache and event list UNLOCKED\n" );
1092 #endif
1093 }
1094 return 0;
1095 }
1096
1097 /*****************************************************************************************************
1098 *
1099 * int rmcastCatchEvents ( int i )
1100 *
1101 * Catch and execute event list events.
1102 *
1103 ******************************************************************************************************/
rmcastCatchEvents(int i)1104 int rmcastCatchEvents( int i )
1105 {
1106 int retval = 0;
1107 int function_retval = 0;
1108 struct sockaddr_in group;
1109 struct ip_mreq mreq;
1110 CACHE *cache_sender_entry;
1111 #ifndef SINGLE_NACK
1112 int update_nak_list_retval;
1113 #endif
1114 EVENT_LIST first;
1115 int msgsize;
1116 BYTE *message;
1117 PACKET_INFO pack;
1118 DATA_PACKET *data;
1119
1120 #ifdef DEBUG_EVENTS
1121 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Event catched!\n" );
1122 #endif
1123 if( event_list == NULL )
1124 {
1125 #ifdef DEBUG_EVENTS
1126 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Internal warning! first == NULL\n" );
1127 #endif
1128 return retval;
1129 }
1130 first = *event_list;
1131
1132 #ifdef DSINGLE_NACK
1133 fprintf( stderr, "DSINGLE_NACK rmcastCatchEvents: showing event list:\n" );
1134 #endif
1135 eventListShow( event_list );
1136
1137 #ifdef DEBUG_EVENTS
1138 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: event list: %p\n",&( *event_list ) );
1139 #endif
1140
1141 do
1142 {
1143 #ifdef DEBUG_EVENTS
1144 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Removing event: action %s timer value: %ld\n",
1145 EVENT_ACTION_NAME[( int )first.action],
1146 first.timer_value );
1147 #endif
1148 #ifdef DNAK_RCV
1149 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Removing event: action %s timer value: %ld\n",
1150 EVENT_ACTION_NAME[( int )first.action],
1151 first.timer_value );
1152 #endif
1153 if( eventListRemoveFirst( &event_list ) == -1 )
1154 {
1155 #ifdef DEBUG_EVENTS
1156 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: warning eventListRemoveFirst( &event_list ) == -1\n" );
1157 #endif
1158 }
1159 eventListShow( event_list );
1160
1161 switch( first.action )
1162 {
1163 case NAK_SND_WAIT:
1164 pack.type=NAK_PACKET_TYPE;
1165 pack.flags=0;
1166 pack.sender_id = local_user_info.member_id;
1167 pack.packet_size=sizeof( NAK_PACKET );
1168 strcpy( pack.packet_data.nak_packet.requested_member_id.ip,first.member_id->ip );
1169 pack.packet_data.nak_packet.requested_member_id.pid=first.member_id->pid;
1170 pack.packet_data.nak_packet.sn=first.sn;
1171 #ifdef SINGLE_NACK
1172 {
1173 CACHE *aux_cache_member = NULL;
1174 aux_cache_member = cacheLookForMember( &cache, first.member_id );
1175
1176 if( aux_cache_member!=NULL )
1177 {
1178 pack.packet_data.nak_packet.base_sn=aux_cache_member->sm_info.member_status.last_seq_rcv;
1179 pack.packet_data.nak_packet.window_size=aux_cache_member->sm_info.member_status.window_size;
1180
1181 window_mask_array2numbers( aux_cache_member->sm_info.member_status.window_mask,
1182 &pack.packet_data.nak_packet.hmask, &pack.packet_data.nak_packet.lmask, aux_cache_member->sm_info.member_status.window_ini );
1183 }
1184 }
1185 #endif
1186 msgPcktMountMessage( &pack, &message, &msgsize );
1187 #ifdef DEBUG_EVENTS
1188 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: will send NAK ...\n" );
1189 #endif
1190 rmcastSendPacket( local_user_info.socket, message, msgsize );
1191
1192 #ifdef DEBUG_EVENTS
1193 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: NAK sent OK.\n" );
1194 #endif
1195 cache_sender_entry = cacheLookForMember( &cache,first.member_id );
1196
1197 if( cache_sender_entry != NULL )
1198 retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ), cache_sender_entry->sm_info.member_id.ip, RET_RCV_WAIT, first.sn );
1199
1200 #ifdef DEBUG_EVENTS
1201 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: RET_RCV_WAIT was inserted in the event list.\n" );
1202 #endif
1203 break;
1204
1205 case RET_RCV_WAIT:
1206
1207 /* Next we will check if it is possible to send one more NAK for the lost message,
1208 i.e., the NAK should be sent only rmcast_options.max_nak times */
1209 #ifdef DEBUG_EVENTS
1210 fprintf( stderr, " \nDEBUG_EVENTS - rmcastCatchEvents: Waiting for retransmission timed out\n ip: %s - pid: %d - sn: %d\n",
1211 first.member_id->ip, first.member_id->pid, first.sn );
1212 #endif
1213 #ifndef SINGLE_NACK
1214 /* When we are using SINGLE_NACK this check is done in the event list insert funciton */
1215 if(( update_nak_list_retval = cacheUpdateNakList( &cache, first.member_id, first.sn ) ) > 0 )
1216 {
1217 #endif
1218 cache_sender_entry = cacheLookForMember( &cache,first.member_id );
1219
1220 if( cache_sender_entry != NULL )
1221 #ifdef SINGLE_NACK
1222 retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ), cache_sender_entry->sm_info.member_id.ip, RET_RCV_EXPIRED, first.sn );
1223 #else
1224 retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ), cache_sender_entry->sm_info.member_id.ip, NAK_SND_WAIT, first.sn );
1225 #endif /* SINGLE_NACK */
1226
1227 #ifdef DEBUG_EVENTS
1228 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: NAK_SND_WAIT was inserted in the event list.\n" );
1229 #endif
1230
1231 #ifndef SINGLE_NACK
1232 }else if( update_nak_list_retval == 0 )
1233 {
1234 /*
1235 * We have sent the max number of NAK( see rmcast_options structure ) we didn't recover the packet
1236 * so we must exit because we cannot keep the reliability
1237 */
1238 fprintf( stderr, "********\n" );
1239 fprintf( stderr, "rmcastCatchEvents ERROR: Recovering packet failed. Max number( %d ) of NAKs reached!\n",rmcast_options.max_nak );
1240 fprintf( stderr, "\tHost IP:PID=%s:%d\tsn=%d\n",first.member_id->ip,first.member_id->pid,first.sn );
1241 fprintf( stderr, "********\n" );
1242
1243 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1244 }
1245 #endif
1246 break;
1247
1248 case RET_SND_WAIT:
1249
1250 if(( data=cacheLookForMessage( &cache, first.member_id, first.sn ) )!=NULL )
1251 {
1252 pack.type = RETRANSM_PACKET_TYPE;
1253 pack.flags = 0;
1254 pack.sender_id = local_user_info.member_id;
1255 pack.packet_size = sizeof( RETRANSM_PACKET );
1256 pack.packet_data.retransm_packet.original_sender_id = *first.member_id;
1257 pack.packet_data.retransm_packet.data_packet = *data;
1258 msgPcktMountMessage( &pack, &message, &msgsize );
1259
1260 #ifdef DEBUG_EVENTS
1261 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: will send a RET... " );
1262 #endif
1263 rmcastSendPacket( local_user_info.socket, message, msgsize );
1264 #ifdef DEBUG_EVENTS
1265 fprintf( stderr, "ok( rmcastCatchEvents )\n" );
1266 #endif
1267 }
1268 #ifdef DEBUG_EVENTS
1269 fprintf( stderr, " \nDEBUG_EVENTS - rmcastCatchEvents: Message retransmited\n ip: %s - pid: %d - sn: %d\n",
1270 first.member_id->ip, first.member_id->pid, first.sn );
1271 #endif
1272 break;
1273
1274 #ifdef REFRESH
1275 case REF_SND_WAIT:
1276 #ifdef DEBUG_EVENTS
1277 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Processing the refresh event.\n" );
1278 #endif
1279 pack.type = REFRESH_PACKET_TYPE;
1280 pack.flags = 0;
1281 pack.sender_id = local_user_info.member_id;
1282 pack.packet_size = sizeof( REFRESH_PACKET );
1283
1284 pthread_mutex_lock( &change_local_user_sn );
1285 pack.packet_data.refresh_packet.sn_of_last_msg_sent = local_user_info.sn - 1;
1286 pthread_mutex_unlock( &change_local_user_sn );
1287
1288 #ifdef DEBUG_EVENTS
1289 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Mounting refresh packet.\n" );
1290 #endif
1291 msgPcktMountMessage( &pack, &message, &msgsize );
1292 if( rmcastSendPacket( local_user_info.socket, message, msgsize ) == 1 )
1293 eventListInsert( &event_list, NULL, NULL, REF_SND_WAIT, -1 );
1294 /* We are not supposed to set a valid sn in the sn field
1295 when we are inserting a REF_SND_WAIT in the event list,
1296 so we use a -1 value to the sn in this eventListInsert
1297 function call*/
1298
1299 #ifdef DEBUG_EVENTS
1300 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: refresh packet mounted.\n" );
1301 #endif
1302 #ifdef DEBUG_EVENTS
1303 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Refresh event processed.\n" );
1304 #endif
1305 break;
1306
1307 #endif
1308
1309 case LEV_GRP_WAIT:
1310
1311 #ifdef DEBUG_EVENTS
1312 fprintf( stderr, "DEBUG_EVENTS - rmcastCatchEvents: Processing 'leave group' event.\n" );
1313 #endif
1314 if(( group.sin_addr.s_addr = inet_addr( local_user_info.group_id.ip ) ) == ( unsigned )-1 )
1315 perror( "rmcastLeaveGroup: inet_addr" );
1316
1317 mreq.imr_multiaddr = group.sin_addr;
1318 mreq.imr_interface.s_addr = INADDR_ANY;
1319
1320 if( rmcast_options.transmission_mode == MULTICAST )
1321 {
1322 if( setsockopt( local_user_info.socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, ( char * )&mreq, sizeof( mreq ) ) == -1 )
1323 perror( "rmcastLeaveGroup: DROP_MEMBERSHIP" );
1324 }
1325
1326 #ifdef DEBUG_NET_MSGS
1327 fprintf( stderr, "DEBUG_NET_MSGS rmcastLeaveGroup: disconnected\n" );
1328 #endif
1329 remove_queues( 0 );
1330
1331 if( rmcast_options.shut_down_routine!=NULL )
1332 rmcast_options.shut_down_routine( );
1333
1334 function_retval = 1;
1335 RM_terminate();
1336
1337 break;
1338
1339 default:
1340 fprintf( stderr, "rmcastCatchEvents Error: Action Undefined %d\n",first.action );
1341 }
1342
1343 if( event_list == NULL )
1344 break;
1345
1346 first = *event_list;
1347
1348 } while( first.timer_value == 0 && function_retval != 1 );
1349
1350 eventListShow( event_list );
1351
1352 return function_retval;
1353 }
1354 /*********************** End of the Main routines *************************************************/
1355
1356 /**************** Routines to facilitate the interface with the event list and the cache **********/
1357
1358 /*****************************************************************************************************
1359 *
1360 * void rmcastProcessDataPacket( PACKET_INFO pckt_info )
1361 *
1362 * Processes the data packet.
1363 *
1364 * Arguments: the packet to be processed, in pckt_info;
1365 *
1366 * Side effects: may affect the event list, the cache and the message queue.
1367 *
1368 ******************************************************************************************************/
rmcastProcessDataPacket(PACKET_INFO pckt_info)1369 void rmcastProcessDataPacket( PACKET_INFO pckt_info )
1370 {
1371 int i, retval;
1372
1373 MEMBER_ID *aux_member_id = &( pckt_info.sender_id );
1374 CACHE_NODE *aux_pckt_info;
1375 CACHE *p_old_cache_sender_entry = NULL, *p_new_cache_sender_entry = NULL;
1376 #ifdef SINGLE_NACK
1377 CACHE *cache_aux1_entry = NULL;
1378 #endif
1379 CACHE *aux_cache_member;
1380 CACHE old_cache_sender_entry, new_cache_sender_entry;
1381
1382 bzero(( &old_cache_sender_entry ),sizeof( CACHE ) );
1383
1384 ( old_cache_sender_entry ).sm_info.member_status.last_identified = -1;
1385 ( old_cache_sender_entry ).sm_info.member_status.first_rcv = -1;
1386 ( old_cache_sender_entry ).sm_info.member_status.last_rcv = -1;
1387 ( old_cache_sender_entry ).sm_info.member_status.last_seq_rcv = -1;
1388
1389 #ifdef SINGLE_NACK
1390 ( old_cache_sender_entry ).sm_info.member_status.window_size = MAX_WINDOW_SIZE;
1391 ( old_cache_sender_entry ).sm_info.member_status.window_ini = 0;
1392 memset(( old_cache_sender_entry ).sm_info.member_status.window_mask, 0, MAX_WINDOW_SIZE );
1393 #endif
1394
1395 p_old_cache_sender_entry = cacheLookForMember( &cache, aux_member_id );
1396
1397 if( ( p_old_cache_sender_entry != NULL ) )
1398 {
1399 old_cache_sender_entry = *p_old_cache_sender_entry;
1400
1401 if( pckt_info.type == DATA_PACKET_TYPE )
1402 {
1403 /* Delete all events about this packet */
1404
1405 if( !rmcastRemoveEvent( &pckt_info,NAK_SND_WAIT ) )
1406 {
1407 #ifdef DSINGLE_NACK
1408 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: no NAK_SND_WAIT event found.\n" );
1409 #endif
1410 if( !rmcastRemoveEvent( &pckt_info,RET_RCV_WAIT ) )
1411 {
1412 #ifdef DSINGLE_NACK
1413 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: no RET_RCV_WAIT found.\n" );
1414 #endif
1415
1416 rmcastRemoveEvent( &pckt_info,RET_SND_WAIT );
1417
1418 #ifdef DSINGLE_NACK
1419 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: After rmcastRemoveEvent( &pckt_info,RET_SND_WAIT ).\n" );
1420 #endif
1421 }
1422 }
1423 }
1424 }
1425
1426 pckt_info.type = DATA_PACKET_TYPE;
1427 #ifdef DEBUG_CACHE
1428 fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: Inserting data in cache: ip: %s pid: %d\n",aux_member_id->ip,aux_member_id->pid );
1429 #endif
1430 aux_cache_member=cacheLookForMember( &cache, &( pckt_info.sender_id ) );
1431
1432 #ifdef SINGLE_NACK /*block begin*/
1433 {
1434 int aux_sn;
1435 int aux_cont;
1436 int aux_last_seq_rcv;
1437 EVENT_LIST *aux_list_node;
1438
1439 aux_sn = -1;
1440 aux_cont = 0;
1441 aux_list_node = NULL;
1442 aux_last_seq_rcv = -1;
1443
1444 if( aux_cache_member != NULL )
1445 {
1446 aux_last_seq_rcv = aux_cache_member->sm_info.member_status.last_seq_rcv;
1447 }
1448 #endif
1449 if(( aux_cache_member == NULL ) ||( pckt_info.packet_data.data_packet.sn > aux_cache_member->sm_info.member_status.last_seq_rcv ) )
1450 retval = cacheInsertMessage( &cache, &( pckt_info.sender_id ), &( pckt_info.packet_data.data_packet ) );
1451 else
1452 return; /* Old packet. We don't need to handle it again */
1453
1454 p_new_cache_sender_entry = cacheLookForMember( &cache, aux_member_id );
1455 /* We don't need to test if p_new_cache_sender_entry is NULL
1456 because we have just inserted it into the cache */
1457 #ifdef SINGLE_NACK
1458 /* When we are sliding the window we must check if we have to send a nack to the next sn available in the window_mask */
1459
1460 if( p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv > aux_last_seq_rcv )
1461 {
1462
1463 aux_cont = ( p_new_cache_sender_entry->sm_info.member_status.last_identified -
1464 p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv );
1465
1466 if( aux_cont > MAX_WINDOW_SIZE )
1467 {
1468 aux_cont = MAX_WINDOW_SIZE;
1469 }
1470
1471 for( i = p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv + 1;
1472 i <= ( aux_cont + p_new_cache_sender_entry->sm_info.member_status.last_seq_rcv );
1473 i++ )
1474 {
1475 aux_sn = i;
1476
1477 if( cacheLookForMessage( &cache, &( pckt_info.sender_id ), aux_sn ) == NULL )
1478 {
1479 #ifdef DSINGLE_NACK
1480 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: trying to insert NAK_SND_WAIT event, sn=%d\n",aux_sn );
1481 #endif
1482 if( eventListFind( &event_list,&( pckt_info.sender_id ),NAK_SND_WAIT,aux_sn,&aux_list_node ) != NULL )
1483 {
1484 #ifdef DSINGLE_NACK
1485 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: NAK_SND_WAIT exists,just set sn=%d bit\n",aux_sn );
1486 #endif
1487 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,aux_sn,&( aux_cache_member->sm_info.member_status ),NAK_SND_WAIT );
1488 }
1489 else
1490 {
1491 if( eventListFind( &event_list,&( pckt_info.sender_id ),RET_RCV_WAIT,aux_sn,&aux_list_node ) != NULL )
1492 {
1493 #ifdef DSINGLE_NACK
1494 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: RET_RCV_WAIT event exists, just set sn=%d bit\n",aux_sn );
1495 #endif
1496 window_mask_set_bit( aux_cache_member->sm_info.member_status.window_mask,aux_sn,&( aux_cache_member->sm_info.member_status ),RET_RCV_WAIT );
1497 }
1498 else
1499 {
1500 #ifdef DSINGLE_NACK
1501 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket:Inserting NAK_SND_WAIT event, sn=%d:\n",aux_sn );
1502 #endif
1503 eventListInsert( &event_list,&( aux_cache_member->sm_info.member_id ),aux_cache_member->sm_info.member_id.ip, NAK_SND_WAIT,aux_sn );
1504 }
1505 }
1506 }
1507 }
1508 }
1509 }
1510 #endif /*SINGLE_NACK block end*/
1511
1512
1513 new_cache_sender_entry = *p_new_cache_sender_entry;
1514 cacheShow( cache );
1515 eventListShow( event_list );
1516 aux_pckt_info = new_cache_sender_entry.last_inserted;
1517
1518 if( retval == 1 ) /* the inserted packet is in sequence -> we will send data to the application */
1519 {
1520 #ifdef DEBUG_CACHE
1521 fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: The inserted packet is in sequence...\n" );
1522 #endif
1523 for( i = old_cache_sender_entry.sm_info.member_status.last_seq_rcv + 1;
1524 i <= new_cache_sender_entry.sm_info.member_status.last_seq_rcv;
1525 i++ )
1526 {
1527 #ifdef DEBUG_CACHE
1528 fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: A \"reliable\" message will be enqueued:( sn: %d size: %d )\n",
1529 i, pckt_info.packet_data.data_packet.data_size );
1530 #endif
1531 if( messageQueueEnter(( char* )( aux_pckt_info->data_pckt.data ),pckt_info.packet_data.data_packet.data_size ) != -1 )
1532 {
1533 aux_pckt_info = aux_pckt_info->previous;
1534 }
1535 else
1536 {
1537 fprintf( stderr, "messageQueueEnter ERROR!\n" );
1538 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1539 }
1540 }
1541 #ifdef DSINGLE_NACK
1542 eventListShow( event_list );
1543 #endif
1544 }
1545 else if( retval == 2 ) /* the inserted packet is not in sequence -> we will send nak's through the network */
1546 {
1547 #ifdef DEBUG_CACHE
1548 fprintf( stderr, "DEBUG_CACHE rmcastprocessdatapacket: The inserted packet is NOT in sequence... \n" );
1549 fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: showing eventlist:\n" );
1550 eventListShow( event_list );
1551 #endif
1552 for( i = old_cache_sender_entry.sm_info.member_status.last_identified+1;
1553 i < new_cache_sender_entry.sm_info.member_status.last_identified;
1554 i++ )
1555 {
1556 pckt_info.packet_data.data_packet.sn = i;
1557
1558 if( cacheLookForMessage( &cache, aux_member_id, i )==NULL )
1559 {
1560
1561 #ifdef DEBUG_EVENTS
1562 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: trying to schedule a NAK_SND_WAIT event to( sn: %d )\n", i );
1563 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: new_cache_sender_entry.sm_info.member_status.last_seq_rcv=%d\n",
1564 new_cache_sender_entry.sm_info.member_status.last_seq_rcv );
1565 #endif
1566 #ifdef SINGLE_NACK
1567 if( i < new_cache_sender_entry.sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE )
1568 {
1569 #endif
1570 if( !rmcastFindEvent( &pckt_info,NAK_SND_WAIT ) )
1571 {
1572 if( !rmcastFindEvent( &pckt_info, RET_RCV_WAIT ) )
1573 {
1574 #ifndef SINGLE_NACK
1575 /* when we are using SINGLE_NACK this check is done inside eventListInsert */
1576 if( cacheUpdateNakList( &cache, &( pckt_info.sender_id ), i )!=0 )
1577 {
1578 #endif
1579 rmcastInsertEvent( &pckt_info,NAK_SND_WAIT );
1580 #ifdef DEBUG_EVENTS
1581 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: NAK_SND_WAIT was inserted in the event list.\n" );
1582 #endif
1583 #ifndef SINGLE_NACK
1584 }
1585 else
1586 {
1587 /*
1588 * We have sent the max number of NAK( see rmcast_options structure ) we didn't recover the packet
1589 * so we must exit because we cannot keep the reliability
1590 */
1591 fprintf( stderr, "********\n" );
1592 fprintf( stderr, "rmcastProcessDataPacket ERROR: Recovering packet failed. Max number( %d ) of NAKs reached!\n",rmcast_options.max_nak );
1593 fprintf( stderr, "\tHost IP:PID=%s:%d\tsn=%d\n",aux_member_id->ip,aux_member_id->pid,i );
1594 fprintf( stderr, "********\n" );
1595
1596 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1597 }
1598 #endif
1599 }
1600 #ifdef SINGLE_NACK
1601 else
1602 {
1603 cache_aux1_entry=cacheLookForMember( &cache, aux_member_id );
1604 #ifdef DSINGLE_NACK
1605 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: before window_mask_set_bit\n" );
1606 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: there is a RET_RCV_WAIT, just set sn=%d\n",i );
1607 #endif
1608 window_mask_set_bit( cache_aux1_entry->sm_info.member_status.window_mask,
1609 i,&cache_aux1_entry->sm_info.member_status,RET_RCV_WAIT );
1610 }
1611 #endif
1612 }
1613 #ifdef SINGLE_NACK
1614 else
1615 {
1616 int aux_retval=0;
1617
1618 #ifdef DSINGLE_NACK
1619 fprintf( stderr, "DSINGLE_NACK rmcastProcessDataPacket: found a NAK_SND_WAIT event.\n" );
1620 #endif
1621 cache_aux1_entry=cacheLookForMember( &cache, aux_member_id );
1622
1623 #ifdef DWINDOW_MASK
1624 fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: before window_mask_get_bit\n" );
1625 fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: window_mask=%p\n",cache_aux1_entry->sm_info.member_status.window_mask );
1626 fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: sn=%d\n",i );
1627 fprintf( stderr, "DWINDOW_MASK rmcastProcessDataPacket: member_status=%p\n",&cache_aux1_entry->sm_info.member_status );
1628 #endif
1629
1630 aux_retval=window_mask_get_bit( cache_aux1_entry->sm_info.member_status.window_mask,i,&cache_aux1_entry->sm_info.member_status );
1631
1632 if( aux_retval != RET_RCV_WAIT ) /* FIXME - este teste pode nao ser necessario pois nao queremos eventos NAK e RET_RCV concorrentes */
1633 {
1634 if( cacheUpdateNakList( &cache, &( pckt_info.sender_id ), i )!=0 )
1635 {
1636 #ifdef DEBUG_EVENTS
1637 fprintf( stderr, "DEBUG_EVENTS rmcastProcessDataPacket: NAK_SND_WAIT event exists, just set sn=%d to NAK_SND_WAIT\n",i );
1638 #endif
1639 window_mask_set_bit( cache_aux1_entry->sm_info.member_status.window_mask,i,&cache_aux1_entry->sm_info.member_status,NAK_SND_WAIT );
1640 }
1641 else
1642 {
1643 /*
1644 * We have sent the max number of NAK( see rmcast_options structure ) we didn't recover the packet
1645 * so we must exit because we cannot keep the reliability
1646 */
1647
1648 fprintf( stderr, "********\n" );
1649 fprintf( stderr, "rmcastProcessDataPacket ERROR: Recovering packet failed. Max number( %d ) of NAKs reached!\n",rmcast_options.max_nak );
1650 fprintf( stderr, "\tHost IP:PID=%s:%d\tsn=%d\n",aux_member_id->ip,aux_member_id->pid,i );
1651 fprintf( stderr, "********\n" );
1652
1653 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1654 }
1655 }
1656 }
1657
1658 } /* i is in the window_mask range*/
1659 #endif /*SINGLE_NACK */
1660 } /* message exists we don't need to send a NACK */
1661 } /* for( i = old_cache_sender_entry.sm_info.member_status.last_identified+1; ... */
1662 } /*else if( retval==2 ) */
1663 else
1664 {
1665 #ifdef DEBUG_CACHE
1666 fprintf( stderr, "DEBUG_CACHE rmcastProcessDataPacket: Couldn�t insert cache node.\n" );
1667 #endif
1668 }
1669
1670 }
1671
1672 /*****************************************************************************************************
1673 *
1674 * void rmcastProcessRefreshPacket( PACKET_INFO pckt_info )
1675 *
1676 * Processes the refresh packet.
1677 *
1678 * Arguments: the packet to be processed, in pckt_info;
1679 *
1680 * Side effects: may affect the event list, the cache and the message queue.
1681 *
1682 ******************************************************************************************************/
1683 #ifdef REFRESH
rmcastProcessRefreshPacket(PACKET_INFO pckt_info)1684 void rmcastProcessRefreshPacket( PACKET_INFO pckt_info )
1685 {
1686 int i, new_value_of_last_identified;
1687 CACHE *cache_sender_entry;
1688 cache_sender_entry = cacheLookForMember( &cache, &( pckt_info.sender_id ) );
1689
1690 if( pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent < 0 )
1691 return;
1692
1693 if( cache_sender_entry == NULL )
1694 {
1695 cache_sender_entry = cacheInsertMember( &cache, &( pckt_info.sender_id ) );
1696 }
1697
1698 new_value_of_last_identified = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent;
1699
1700 #ifdef SINGLE_NACK
1701 if( new_value_of_last_identified >( cache_sender_entry->sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE ) )
1702 {
1703 /* Send NACKs that fit in the window */
1704 new_value_of_last_identified = cache_sender_entry->sm_info.member_status.last_seq_rcv + MAX_WINDOW_SIZE;
1705 }
1706
1707 for( i = new_value_of_last_identified;
1708 i > cache_sender_entry->sm_info.member_status.last_identified;
1709 i -- )
1710 {
1711 if( !rmcastFindEvent( &pckt_info,NAK_SND_WAIT ) )
1712 {
1713 if( !rmcastFindEvent( &pckt_info,RET_RCV_WAIT ) )
1714 {
1715 pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent = i;
1716 rmcastInsertEvent( &pckt_info,NAK_SND_WAIT );
1717 #ifdef DSINGLE_NACK
1718 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: NAK_SND_WAIT insertion caused by refresh message\n" );
1719 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: host:pid:sn=%s:%d:%d\n",
1720 pckt_info.sender_id.ip,pckt_info.sender_id.pid,i );
1721 #endif
1722 }
1723 else
1724 {
1725 /* RET_RCV_WAIT exists, just set the i bit in the window_mask */
1726 window_mask_set_bit( cache_sender_entry->sm_info.member_status.window_mask,
1727 i,
1728 &( cache_sender_entry->sm_info.member_status ),
1729 RET_RCV_WAIT );
1730 #ifdef DSINGLE_NACK
1731 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: bit settled to RET_RCV_WAIT caused by refresh message\n" );
1732 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: host:pid:sn=%s:%d:%d\n",
1733 pckt_info.sender_id.ip,pckt_info.sender_id.pid,i );
1734 #endif
1735 }
1736 }
1737 else
1738 {
1739 /* NAK_SND_WAIT exists, just set the i bit in the window_mask */
1740 window_mask_set_bit( cache_sender_entry->sm_info.member_status.window_mask,
1741 i,
1742 &( cache_sender_entry->sm_info.member_status ),
1743 NAK_SND_WAIT );
1744 #ifdef DSINGLE_NACK
1745 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: bit settled to NAK_RCV_WAIT caused by refresh message\n" );
1746 fprintf( stderr, "DSINGLE_NACK rmcastProcessRefreshPacket: host:pid:sn=%s:%d:%d\n",
1747 pckt_info.sender_id.ip,pckt_info.sender_id.pid,i );
1748 #endif
1749 }
1750 }
1751 #else
1752 for( i = pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent;
1753 i > cache_sender_entry->sm_info.member_status.last_identified;
1754 i -- )
1755 {
1756 #ifdef DEBUG_EVENTS
1757 fprintf( stderr, "Will wait to send a nak, i: %d last received or nak: %d.\n", i,
1758 cache_sender_entry->sm_info.member_status.last_identified );
1759 #endif
1760
1761 pckt_info.packet_data.refresh_packet.sn_of_last_msg_sent = i;
1762 rmcastInsertEvent( &pckt_info,NAK_SND_WAIT );
1763 }
1764 #endif /* SINGLE_NACK */
1765 if( cache_sender_entry->sm_info.member_status.last_identified < new_value_of_last_identified )
1766 cache_sender_entry->sm_info.member_status.last_identified = new_value_of_last_identified;
1767 }
1768 #endif
1769
1770 /*****************************************************************************************************
1771 *
1772 * int rmcastRemoveEvent( PACKET_INFO *packt, int event_type )
1773 *
1774 * Removes an event from the event list.
1775 *
1776 * Arguments: the network packet, in packt;
1777 * the event type, in event_type.
1778 *
1779 * Returns: 1, on sucess;
1780 * 0, otherwise.
1781 *
1782 * Side effects: it affects the event list.
1783 *
1784 ******************************************************************************************************/
rmcastRemoveEvent(PACKET_INFO * packt,int event_type)1785 int rmcastRemoveEvent( PACKET_INFO *packt, int event_type )
1786 {
1787 int retval;
1788
1789 retval = 0;
1790
1791 switch( packt->type )
1792 {
1793 case RETRANSM_PACKET_TYPE:
1794 retval = eventListRemove( &event_list, &( packt->packet_data.retransm_packet.original_sender_id ),
1795 event_type, packt->packet_data.retransm_packet.data_packet.sn );
1796 break;
1797
1798 case DATA_PACKET_TYPE:
1799 retval = eventListRemove( &event_list, &( packt->sender_id ),
1800 event_type, packt->packet_data.data_packet.sn );
1801 break;
1802 case NAK_PACKET_TYPE:
1803 #ifdef DNAK_RCV
1804 fprintf( stderr, "rmcastRemoveEvent: calling eventListRemove\n" );
1805 #endif
1806 retval = eventListRemove( &event_list, &( packt->packet_data.nak_packet.requested_member_id ),
1807 event_type, packt->packet_data.nak_packet.sn );
1808 #ifdef DNAK_RCV
1809 fprintf( stderr, "rmcastRemoveEvent: after calling eventListRemove\n" );
1810 #endif
1811 break;
1812 }
1813 return retval;
1814 }
1815
1816 /*****************************************************************************************************
1817 *
1818 * int rmcastFindEvent( PACKET_INFO *packt, int event_type )
1819 *
1820 * Searches for an event in the event list.
1821 *
1822 * Arguments: the network packet, in packt;
1823 * the event type, in event_type.
1824 *
1825 * Returns: 1, on sucess;
1826 * 0, otherwise.
1827 *
1828 ******************************************************************************************************/
rmcastFindEvent(PACKET_INFO * packt,int event_type)1829 int rmcastFindEvent( PACKET_INFO *packt, int event_type )
1830 {
1831 int retval;
1832
1833 EVENT_LIST *aux;
1834
1835 switch( packt->type )
1836 {
1837 case NAK_PACKET_TYPE:
1838 #ifdef DNAK_RCV
1839 fprintf( stderr, "rmcastFindEvent: calling eventlistFind to look for event:%d\n",event_type );
1840 #endif
1841
1842 retval = (( eventListFind( &event_list, &( packt->packet_data.nak_packet.requested_member_id ),
1843 event_type, packt->packet_data.nak_packet.sn,&aux ) )!= NULL );
1844 #ifdef DNAK_RCV
1845 fprintf( stderr, "rmcastFindEvent: after calling eventlistFind to look for event:%d, retval=%d\n",event_type,retval );
1846 #endif
1847 break;
1848 case DATA_PACKET_TYPE:
1849 retval = (( eventListFind( &event_list, &( packt->sender_id ),
1850 event_type, packt->packet_data.data_packet.sn,&aux ) )!=NULL );
1851 break;
1852 case REFRESH_PACKET_TYPE:
1853 retval = (( eventListFind( &event_list, &( packt->sender_id ),
1854 event_type, packt->packet_data.refresh_packet.sn_of_last_msg_sent,&aux ) )!=NULL );
1855 break;
1856 default:
1857 fprintf( stderr, "rmcastFindEvent ERROR: Not yet implemented event=%d packet_type=%d.\n",event_type,packt->type );
1858 return 0;
1859 }
1860 return retval;
1861 }
1862
1863 /*****************************************************************************************************
1864 *
1865 * int rmcastCacheContains( PACKET_INFO *packt )
1866 *
1867 * Searches for a packet in the cache.
1868 *
1869 * Arguments: the network packet, in packt;
1870 *
1871 * Returns: 1, on sucess;
1872 * 0, otherwise.
1873 *
1874 ******************************************************************************************************/
rmcastCacheContains(PACKET_INFO * packt)1875 int rmcastCacheContains( PACKET_INFO *packt )
1876 {
1877 int retval;
1878
1879 retval = 0;
1880
1881 switch( packt->type )
1882 {
1883 case NAK_PACKET_TYPE:
1884 retval = (( cacheLookForMessage( &cache, &( packt->packet_data.nak_packet.requested_member_id ),
1885 packt->packet_data.nak_packet.sn ) )!= NULL );
1886 break;
1887 case RETRANSM_PACKET_TYPE:
1888 retval = (( cacheLookForMessage( &cache, &( packt->packet_data.retransm_packet.original_sender_id ),
1889 packt->packet_data.retransm_packet.data_packet.sn ) )!= NULL );
1890 break;
1891 default:
1892 fprintf( stderr, "rmcastCacheContains: Unknown packet type:%d\n",packt->type );
1893 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1894 }
1895 return retval;
1896 }
1897
1898 /*****************************************************************************************************
1899 *
1900 * int rmcastInsertEvent( PACKET_INFO *packt, int event_type )
1901 *
1902 * Inserts an event in the event list.
1903 *
1904 * Arguments: the network packet, in packt;
1905 * the event type, in event_type.
1906 *
1907 * Returns: 1, on sucess;
1908 * 0, otherwise.
1909 *
1910 * Side effects: it affects the event list.
1911 *
1912 ******************************************************************************************************/
rmcastInsertEvent(PACKET_INFO * packt,int event_type)1913 int rmcastInsertEvent( PACKET_INFO *packt, int event_type )
1914 {
1915 int retval;
1916 CACHE *cache_sender_entry;
1917
1918 retval = 0;
1919
1920 switch( packt->type )
1921 {
1922 case NAK_PACKET_TYPE:
1923 cache_sender_entry = cacheLookForMember( &cache,
1924 &( packt->packet_data.nak_packet.requested_member_id ) );
1925 #ifdef DNAK_RCV
1926 fprintf( stderr, "rmcastInsertEvent: calling eventListInsert\n" );
1927 #endif
1928
1929 if( cache_sender_entry != NULL )
1930 retval = eventListInsert( &event_list, &( cache_sender_entry->sm_info.member_id ),
1931 packt->sender_id.ip, event_type, ( packt->packet_data.nak_packet.sn ) );
1932 #ifdef DNAK_RCV
1933 fprintf( stderr, "rmcastInsertEvent: after calling eventListInsert cache_sender_entry:%p\n",cache_sender_entry );
1934 #endif
1935
1936 break;
1937 case DATA_PACKET_TYPE:
1938 cache_sender_entry = cacheLookForMember( &cache, &( packt->sender_id ) );
1939 if( cache_sender_entry!=NULL )
1940 retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ),
1941 packt->sender_id.ip, event_type, packt->packet_data.data_packet.sn );
1942 break;
1943 case REFRESH_PACKET_TYPE:
1944 cache_sender_entry = cacheLookForMember( &cache, &( packt->sender_id ) );
1945
1946 if( cache_sender_entry!=NULL )
1947 retval = eventListInsert( &event_list,&( cache_sender_entry->sm_info.member_id ),
1948 packt->sender_id.ip,event_type, packt->packet_data.refresh_packet.sn_of_last_msg_sent );
1949
1950 break;
1951 default:
1952 fprintf( stderr, "rmcastInsertEvent: Unknown event type:%d\n",packt->type );
1953 RM_leaveGroup( local_user_info.socket, local_user_info.member_id.ip );
1954
1955 }
1956 #ifdef DNAK_RCV
1957 if( event_type == NAK_PACKET_TYPE )
1958 {
1959 fprintf( stderr, "rmcastInsertEvent: Show cache after NACK event insertion\n" );
1960 cacheShow( cache );
1961 }
1962 #endif
1963
1964 return retval;
1965 }
1966
1967 /**********End of the Routines to facilitate the interface with the event list and the cache ******/
1968 #endif
1969