1 /*
2  This file is part of pathload.
3 
4  pathload is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or
7  (at your option) any later version.
8 
9  pathload is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with pathload; if not, write to the Free Software
16  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19 /*-------------------------------------------------
20    pathload : an end-to-end available bandwidth
21               estimation tool
22    Author   : Manish Jain ( jain@cc.gatech.edu )
23               Constantinos Dovrolis (dovrolis@cc.gatech.edu )
24    Release  : Ver 1.3.2
25    Support  : This work was supported by the SciDAC
26               program of the US department
27 --------------------------------------------------*/
28 
29 /*
30  * $Header: /net/cvs/bwtest/pathload/pathload_rcv_func.c,v 1.294 2006/05/19 20:21:15 jain Exp $
31  */
32 
33 #include "pathload_gbls.h"
34 #include "pathload_rcv.h"
35 
recvfrom_latency(struct sockaddr_in rcv_udp_addr)36 l_int32 recvfrom_latency(struct sockaddr_in rcv_udp_addr)
37 {
38   char *random_data;
39   float min_OSdelta[50], ord_min_OSdelta[50];
40   l_int32 j ;
41   struct timeval current_time, first_time ;
42 
43   if ( (random_data = malloc(max_pkt_sz*sizeof(char)) ) == NULL )
44   {
45     printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz);
46     exit(-1);
47   }
48   srandom(getpid()); /* Create random payload; does it matter? */
49   for (j=0; j<max_pkt_sz-1; j++) random_data[j]=(char)(random()&0x000000ff);
50 
51   for (j=0; j<50; j++)
52   {
53     if ( sendto(sock_udp, random_data, max_pkt_sz, 0,
54          (struct sockaddr*)&rcv_udp_addr,sizeof(rcv_udp_addr)) == -1)
55         perror("recvfrom_latency");
56     gettimeofday(&first_time, NULL);
57     recvfrom(sock_udp, random_data, max_pkt_sz, 0, NULL, NULL);
58     gettimeofday(&current_time, NULL);
59     min_OSdelta[j]= time_to_us_delta(first_time, current_time);
60   }
61   /* Use median  of measured latencies to avoid outliers */
62   order_float(min_OSdelta, ord_min_OSdelta,0,50);
63   free(random_data);
64   return((l_int32)ord_min_OSdelta[25]);
65 }
66 
67 
get_adr()68 double get_adr()
69 {
70   struct timeval select_tv,arrv_tv[MAX_STREAM_LEN] ;
71   double delta ;
72   double bw_msr = 0;
73   double bad_bw_msr[10] ;
74   int num_bad_train=0 ;
75   int first = 1 ;
76   double sum =0 ;
77   l_int32 exp_train_id ;
78   l_int32 bad_train = 1;
79   l_int32 retry = 0 ;
80   l_int32 ctr_code ;
81   l_int32 ctr_msg_rcvd ;
82   l_int32 train_len=0;
83   l_int32 last=0,i;
84   l_int32 spacecnt=24 ;
85   char ctr_buff[8];
86   l_int32 num_burst;
87 
88   if (Verbose)
89     printf("  ADR [");
90   fflush(stdout);
91   fprintf(pathload_fp,"  ADR [");
92   fflush(pathload_fp);
93   ctr_code = SEND_TRAIN | CTR_CODE ;
94   send_ctr_mesg(ctr_buff, ctr_code);
95   exp_train_id = 0 ;
96   for(i=0;i<100;i++)
97   {
98     arrv_tv[i].tv_sec=0;
99     arrv_tv[i].tv_usec=0;
100   }
101   while ( retry < MAX_TRAIN && bad_train )
102   {
103     if ( train_len == 5)
104       train_len = 3;
105     else
106       train_len = TRAIN_LEN - exp_train_id*15;
107     if (Verbose)
108       printf(".");
109     fflush(stdout);
110     fprintf(pathload_fp,".");
111     spacecnt--;
112     ctr_msg_rcvd = 0 ;
113     bad_train = recv_train(exp_train_id, arrv_tv, train_len);
114     /* Compute dispersion and bandwidth measurement */
115     if (!bad_train)
116     {
117       num_burst=0;
118       interrupt_coalescence=check_intr_coalescence(arrv_tv,train_len,&num_burst);
119       last=train_len;
120       while(!arrv_tv[last].tv_sec) --last;
121       delta = time_to_us_delta(arrv_tv[1], arrv_tv[last]);
122       bw_msr = ((28+max_pkt_sz) << 3) * (last-1) / delta;
123       /* tell sender that it was agood train.*/
124       ctr_code = GOOD_TRAIN | CTR_CODE ;
125       send_ctr_mesg(ctr_buff, ctr_code ) ;
126     }
127     else
128     {
129       retry++ ;
130       /* wait for atleast 10msec before requesting another train */
131       last=train_len;
132       while(!arrv_tv[last].tv_sec) --last;
133       first=1 ;
134       while(!arrv_tv[first].tv_sec) ++first ;
135       delta = time_to_us_delta(arrv_tv[first], arrv_tv[last]);
136       bad_bw_msr[num_bad_train++] = ((28+max_pkt_sz) << 3) * (last-first-1) / delta;
137       select_tv.tv_sec=0;select_tv.tv_usec=10000;
138       select(0,NULL,NULL,NULL,&select_tv);
139       ctr_code = BAD_TRAIN | CTR_CODE ;
140       send_ctr_mesg(ctr_buff, ctr_code ) ;
141       exp_train_id++ ;
142     }
143   }
144 
145   if (Verbose)
146   {
147     i = spacecnt;
148     putchar(']');
149     while(--i>0)putchar(' ');
150     printf(":: ");
151   }
152   fputc(']',pathload_fp);
153   while(--spacecnt>0)fputc(' ',pathload_fp);
154   fprintf(pathload_fp,":: ");
155   if ( !bad_train)
156   {
157     if(Verbose)
158       printf("%.2fMbps\n", bw_msr ) ;
159     fprintf(pathload_fp,"%.2fMbps\n", bw_msr ) ;
160   }
161   else
162   {
163     for ( i=0;i<num_bad_train;i++)
164       if ( finite(bad_bw_msr[i]))
165         sum += bad_bw_msr[i] ;
166     bw_msr = sum/num_bad_train ;
167     if(Verbose)
168       printf("%.2fMbps (I)\n", bw_msr ) ;
169     fprintf(pathload_fp,"%.2fMbps (I)\n", bw_msr ) ;
170   }
171   return bw_msr ;
172 }
173 
174 /* Receive a complete packet train from the sender */
recv_train(l_int32 exp_train_id,struct timeval * time,l_int32 train_len)175 l_int32 recv_train( l_int32 exp_train_id, struct timeval *time,l_int32 train_len)
176 {
177   struct sigaction sigstruct ;
178   struct timeval current_time;
179   struct timeval select_tv;
180   fd_set readset;
181   l_int32 ret_val ;
182   l_int32 pack_id , exp_pack_id ;
183   l_int32 bad_train = 0 ;
184   l_int32 train_id  ;
185   l_int32 rcvd=0;
186   char *pack_buf ;
187   char ctr_buff[8];
188 #ifdef THRLIB
189   thr_arg arg ;
190   pthread_t tid;
191   pthread_attr_t attr ;
192 #endif
193   exp_pack_id=0;
194 
195   if ( ( pack_buf = malloc(max_pkt_sz*sizeof(char))) == NULL )
196   {
197     printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz);
198     exit(-1);
199   }
200 
201   sigstruct.sa_handler = sig_sigusr1 ;
202   sigemptyset(&sigstruct.sa_mask);
203   sigstruct.sa_flags = 0 ;
204   #ifdef SA_INTERRUPT
205     sigstruct.sa_flags |= SA_INTERRUPT ;
206   #endif
207   sigaction(SIGUSR1 , &sigstruct,NULL );
208 
209 #ifdef THRLIB
210   arg.finished_stream=0;
211   arg.ptid = pthread_self() ;
212   pthread_attr_init(&attr);
213   pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
214   if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 )
215   {
216     perror("recv_train::pthread_create");
217     fprintf(stdout,"Failed to create thread. exiting...\n");
218     fprintf(pathload_fp,"Failed to create thread. exiting...\n");
219     exit(-1);
220   }
221 #endif
222 
223   do
224   {
225 #ifndef THRLIB
226       FD_ZERO(&readset);
227       FD_SET(sock_tcp,&readset);
228       FD_SET(sock_udp,&readset);
229       select_tv.tv_sec=1000;select_tv.tv_usec=0;
230       if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 )
231       {
232         if (FD_ISSET(sock_udp,&readset) )
233         {
234 #endif
235     if (recvfrom(sock_udp, pack_buf, max_pkt_sz, 0, NULL, NULL) != -1)
236     {
237       gettimeofday(&current_time, NULL);
238       memcpy(&train_id, pack_buf, sizeof(l_int32));
239       train_id = ntohl(train_id) ;
240       memcpy(&pack_id, pack_buf+sizeof(l_int32), sizeof(l_int32));
241       pack_id=ntohl(pack_id);
242       if (train_id == exp_train_id && pack_id==exp_pack_id )
243       {
244         rcvd++;
245         time[pack_id] = current_time ;
246         exp_pack_id++;
247       }
248       else bad_train=1;
249     }
250 #ifndef THRLIB
251         } // end of FD_ISSET
252 
253         if ( FD_ISSET(sock_tcp,&readset) )
254         {
255           /* check the control connection.*/
256           if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
257           {
258             if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
259                   ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) )
260             {
261               break;
262             }
263           }
264         }
265       } // end of select
266   } while (1);
267 #else
268   } while (!arg.finished_stream);
269 #endif
270 
271   if ( rcvd != train_len+1 ) bad_train=1;
272   gettimeofday(&time[pack_id+1], NULL);
273   sigstruct.sa_handler = SIG_DFL ;
274   sigemptyset(&sigstruct.sa_mask);
275   sigstruct.sa_flags = 0 ;
276   sigaction(SIGUSR1 , &sigstruct,NULL );
277   free(pack_buf);
278   return bad_train ;
279 }
280 
check_intr_coalescence(struct timeval time[],l_int32 len,l_int32 * burst)281 l_int32 check_intr_coalescence(struct timeval time[],l_int32 len, l_int32 *burst)
282 {
283   double delta[MAX_STREAM_LEN];
284   l_int32 b2b=0,tmp=0;
285   l_int32 i;
286   l_int32 min_gap;
287 
288   min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL : 3*rcv_latency ;
289   //printf("---%d\n",len);
290   for (i=2;i<len;i++)
291   {
292     delta[i] = time_to_us_delta(time[i-1],time[i]);
293     if ( delta[i] <=  min_gap )
294     {
295       b2b++ ;
296       tmp++;
297     }
298     else
299     {
300       if ( tmp >=3 )
301       {
302         (*burst)++;
303         tmp=0;
304       }
305     }
306   }
307 
308   //fprintf(stderr,"\tNumber of b2b %d, Number of burst %d\n",b2b,*burst);
309   if ( b2b > .6*len )
310   {
311    return 1;
312   }
313   else return 0;
314 }
315 
316 /*
317    Receive N streams .
318    After each stream, compute the loss rate.
319    Mark a stream "lossy" , if losss rate in
320    that stream is more than a threshold.
321 */
recv_fleet()322 l_int32 recv_fleet()
323 {
324   struct sigaction sigstruct ;
325   struct timeval snd_tv[MAX_STREAM_LEN], arrv_tv[MAX_STREAM_LEN];
326   struct timeval current_time, first_time;
327   double pkt_loss_rate ;
328   double owd[MAX_STREAM_LEN] ;
329   double snd_tm[MAX_STREAM_LEN] ;
330   double arrv_tm[MAX_STREAM_LEN];
331   l_int32 ctr_code ;
332   l_int32 pkt_lost = 0 ;
333   l_int32 stream_id_n , stream_id=0 ;
334   l_int32 total_pkt_rcvd=0 ,pkt_rcvd = 0 ;
335   l_int32 pkt_id = 0 ;
336   l_int32 pkt_id_n = 0 ;
337   l_int32 exp_pkt_id = 0 ;
338   l_int32 stream_cnt = 0 ; /* 0->n*/
339   l_int32 fleet_id  , fleet_id_n = 0 ;
340   l_int32 lossy_stream = 0 ;
341   l_int32 return_val = 0 ;
342   l_int32 finished_stream = 0 ;
343   l_int32 stream_duration ;
344   l_int32 num_sndr_cs[20],num_rcvr_cs[20];
345   char ctr_buff[8];
346   char *pkt_buf ;
347   double owdfortd[MAX_STREAM_LEN];
348   l_int32 num_substream,substream[MAX_STREAM_LEN];
349   l_int32 low,high,len,j;
350   l_int32 b2b_pkt_per_stream[20];
351   l_int32 tmp_b2b;
352 #ifdef THRLIB
353   pthread_t tid ;
354   thr_arg arg ;
355   pthread_attr_t attr ;
356 #endif
357   l_int32 num_bursts;
358   l_int32 abort_fleet=0;
359   l_int32 p=0;
360   struct timeval select_tv;
361   fd_set readset;
362   l_int32 ret_val ;
363 
364   if ( (pkt_buf = malloc(cur_pkt_sz*sizeof(char)) ) == NULL )
365   {
366     printf("ERROR : unable to malloc %ld bytes \n",cur_pkt_sz);
367     exit(-1);
368   }
369   trend_idx=0;
370   ic_flag = 0;
371   if(verbose&&!Verbose)
372     printf("Receiving Fleet %ld, Rate %.2fMbps\n",exp_fleet_id,tr);
373   if(Verbose)
374   {
375     printf("\nReceiving Fleet %ld\n",exp_fleet_id);
376     printf("  Fleet Parameter(req)  :: R=%.2fMbps, L=%ldB, K=%ldpackets, \
377 T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval) ;
378   }
379   fprintf(pathload_fp,"\nReceiving Fleet %ld\n",exp_fleet_id);
380   fprintf(pathload_fp,"  Fleet Parameter(req)  :: R=%.2fMbps, L=%ldB, \
381 K=%ldpackets, T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval);
382 
383   if(Verbose)
384     printf("  Lossrate per stream   :: ");
385   fprintf(pathload_fp,"  Lossrate per stream   :: ");
386 
387   sigstruct.sa_handler = sig_sigusr1 ;
388   sigemptyset(&sigstruct.sa_mask);
389   sigstruct.sa_flags = 0 ;
390   #ifdef SA_INTERRUPT
391     sigstruct.sa_flags |= SA_INTERRUPT ;
392   #endif
393   sigaction(SIGUSR1 , &sigstruct,NULL );
394 
395   while ( stream_cnt < num_stream  )
396   {
397 #ifdef THRLIB
398     arg.finished_stream=0;
399     arg.ptid = pthread_self() ;
400     arg.stream_cnt = stream_cnt ;
401     pthread_attr_init(&attr);
402     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
403     if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 )
404     {
405       perror("recv_fleet::pthread_create");
406       exit(-1);
407     }
408 #endif
409     pkt_lost = 0 ;
410     first_time.tv_sec = 0 ;
411     for (j=0; j < stream_len; j++ )
412     {
413       snd_tv[j].tv_sec=0 ; snd_tv[j].tv_usec=0 ;
414       arrv_tv[j].tv_sec=0; arrv_tv[j].tv_usec=0;
415     }
416 
417     /* Receive K packets of ith stream */
418 #ifdef THRLIB
419     while(!arg.finished_stream)
420 #else
421     while(1)
422 #endif
423     {
424 #ifndef THRLIB
425       FD_ZERO(&readset);
426       FD_SET(sock_tcp,&readset);
427       FD_SET(sock_udp,&readset);
428       select_tv.tv_sec=1000;select_tv.tv_usec=0;
429       if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 )
430       {
431         if (FD_ISSET(sock_udp,&readset) )
432         {
433 #endif
434           if( recvfrom(sock_udp,pkt_buf,cur_pkt_sz,0,NULL,NULL) > 0 )
435           {
436             gettimeofday(&current_time,NULL);
437             memcpy(&fleet_id_n,pkt_buf , sizeof(l_int32));
438             fleet_id = ntohl(fleet_id_n) ;
439             memcpy(&stream_id_n,pkt_buf+sizeof(l_int32) , sizeof(l_int32));
440         stream_id = ntohl(stream_id_n) ;
441         memcpy(&pkt_id_n, pkt_buf+2*sizeof(l_int32), sizeof(l_int32));
442         pkt_id = ntohl(pkt_id_n) ;
443         if ( fleet_id == exp_fleet_id  && stream_id == stream_cnt &&
444              pkt_id >= exp_pkt_id )
445         {
446           if ( first_time.tv_sec == 0 )
447             first_time = current_time;
448           arrv_tv[pkt_id] = current_time ;
449           memcpy(&(snd_tv[pkt_id].tv_sec) , pkt_buf+3*sizeof(l_int32), sizeof(l_int32));
450           memcpy(&(snd_tv[pkt_id].tv_usec), pkt_buf+4*sizeof(l_int32), sizeof(l_int32));
451           if ( pkt_id > exp_pkt_id ) /* reordered are considered as lost */
452           {
453             pkt_lost += ( pkt_id - exp_pkt_id ) ;
454             exp_pkt_id  = pkt_id ;
455           }
456           ++exp_pkt_id ;
457           ++pkt_rcvd;
458         }
459       } // end of recvfrom
460 #ifndef THRLIB
461         } // end of FD_ISSET
462 
463         if ( FD_ISSET(sock_tcp,&readset) )
464         {
465           /* check the control connection.*/
466           if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
467           {
468             if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
469                   ((ret_val & 0x7fffffff) == FINISHED_STREAM ) )
470             {
471               while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ;
472               if ( ret_val == stream_cnt )
473               {
474                 break ;
475               }
476             }
477           }
478         }
479       } // end of select
480       else
481       {
482         perror("select");
483         exit(0);
484       }
485 #endif
486 
487     }
488 
489     for (j=0; j < stream_len; j++ )
490     {
491       snd_tv[j].tv_sec = ntohl(snd_tv[j].tv_sec);
492       snd_tv[j].tv_usec = ntohl(snd_tv[j].tv_usec);
493       snd_tm[j]= snd_tv[j].tv_sec * 1000000.0 + snd_tv[j].tv_usec ;
494       arrv_tm[j] = arrv_tv[j].tv_sec * 1000000.0 + arrv_tv[j].tv_usec ;
495       owd[j] =  arrv_tm[j] - snd_tm[j]   ;
496     }
497 
498     total_pkt_rcvd += pkt_rcvd ;
499     finished_stream = 0 ;
500     pkt_lost +=  stream_len  - exp_pkt_id ;
501     pkt_loss_rate = (double )pkt_lost * 100. / stream_len ;
502     if(Verbose)
503       printf(":%.1f",pkt_loss_rate ) ;
504     fprintf(pathload_fp,":%.1f",pkt_loss_rate ) ;
505     exp_pkt_id = 0 ;
506     stream_cnt++ ;
507 
508     num_bursts=0;
509     if ( interrupt_coalescence )
510       ic_flag=check_intr_coalescence(arrv_tv,pkt_rcvd,&num_bursts);
511 
512     if ( pkt_loss_rate < HIGH_LOSS_RATE && pkt_loss_rate >= MEDIUM_LOSS_RATE )
513       lossy_stream++ ;
514 
515     if ( pkt_loss_rate >= HIGH_LOSS_RATE || ( stream_cnt >= num_stream
516          && lossy_stream*100./stream_cnt >= MAX_LOSSY_STREAM_FRACTION ))
517     {
518       if ( increase_stream_len )
519       {
520         increase_stream_len=0;
521         lower_bound=1;
522       }
523 
524       if(Verbose)
525         printf("\n  Fleet aborted due to high lossrate");
526       fprintf(pathload_fp,"\n  Fleet aborted due to high lossrate");
527       abort_fleet=1;
528       break ;
529     }
530     else
531     {
532       /* analyze trend in stream */
533       num += get_sndr_time_interval(snd_tm,&snd_time_interval) ;
534       adjust_offset_to_zero(owd, stream_len);
535       num_substream = eliminate_sndr_side_CS(snd_tm,substream);
536       num_sndr_cs[stream_cnt-1] = num_substream ;
537       substream[num_substream++]=stream_len-1;
538       low=0;
539       num_rcvr_cs[stream_cnt-1]=0;
540       tmp_b2b=0;
541       for (j=0;j<num_substream;j++)
542       {
543         high=substream[j];
544         if ( ic_flag )
545         {
546           if ( num_bursts < 2 )
547           {
548             if ( ++repeat_1 == 3)
549             {
550               repeat_1=0;
551               /* Abort fleet and try to find lower bound */
552               abort_fleet=1;
553               lower_bound=1;
554               increase_stream_len=0;
555               break ;
556             }
557           }
558           else if ( num_bursts <= 5 )
559           {
560             if ( ++repeat_2 == 3)
561             {
562               repeat_2=0;
563               /* Abort fleet and retry with longer stream length */
564               abort_fleet=1;
565               increase_stream_len=1;
566               break ;
567             }
568           }
569           else
570           {
571             increase_stream_len=0;
572             len=eliminate_b2b_pkt_ic(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b);
573             /*
574             for(p=0;p<len;p++)
575               printf("%d %f\n",p,owdfortd[p]);
576             */
577             pct_metric[trend_idx]=
578               pairwise_comparision_test(owdfortd , 0 , len );
579             pdt_metric[trend_idx]=
580               pairwise_diff_test(owdfortd , 0, len );
581             trend_idx+=1;
582           }
583         }
584         else
585         {
586           len=eliminate_rcvr_side_CS(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b);
587           if ( len > MIN_STREAM_LEN )
588           {
589             get_trend(owdfortd,len);
590           }
591         }
592         low=high+1;
593       }
594       if ( abort_fleet )
595         break;
596       else
597       {
598         b2b_pkt_per_stream[stream_cnt-1] = tmp_b2b ;
599         ctr_code = CONTINUE_STREAM | CTR_CODE;
600         send_ctr_mesg(ctr_buff, ctr_code);
601       }
602     }
603     pkt_rcvd = 0 ;
604 
605     /* A hack for slow links */
606     stream_duration = stream_len * time_interval ;
607     if ( stream_duration >= 500000 )
608     {
609       slow=1;
610       break ;
611     }
612   }  /*end of while (stream_cnt < num_stream ). */
613 
614   if ( Verbose ) printf("\n");
615   fprintf(pathload_fp,"\n");
616 
617   if ( abort_fleet )
618   {
619     printf("\tAborting fleet. Stream_cnt %d\n",stream_cnt);
620      ctr_code = ABORT_FLEET | CTR_CODE;
621      send_ctr_mesg(ctr_buff , ctr_code ) ;
622      return_val = -1 ;
623   }
624   else
625      print_contextswitch_info(num_sndr_cs,num_rcvr_cs,b2b_pkt_per_stream,stream_cnt);
626 
627   exp_fleet_id++ ;
628   free(pkt_buf);
629   return return_val  ;
630 }
631 
print_contextswitch_info(l_int32 num_sndr_cs[],l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt)632 void print_contextswitch_info(l_int32 num_sndr_cs[], l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt)
633 {
634     l_int32 j;
635 
636     if (Verbose)
637       printf("  # of CS @ sndr        :: ");
638     fprintf(pathload_fp,"  # of CS @ sndr        :: ");
639 
640     for(j=0;j<stream_cnt-1;j++)
641     {
642       if (Verbose) printf(":%2d",num_sndr_cs[j]);
643       fprintf(pathload_fp,":%2d",num_sndr_cs[j]);
644     }
645     if ( Verbose ) printf("\n");
646     fprintf(pathload_fp,"\n");
647     if (Verbose)
648       printf("  # of CS @ rcvr        :: ");
649     fprintf(pathload_fp,"  # of CS @ rcvr        :: ");
650     for(j=0;j<stream_cnt-1;j++)
651     {
652       if (Verbose) printf(":%2d",num_rcvr_cs[j]);
653       fprintf(pathload_fp,":%2d",num_rcvr_cs[j]);
654     }
655     if ( Verbose ) printf("\n");
656     fprintf(pathload_fp,"\n");
657 
658     if (Verbose)
659       printf("  # of DS @ rcvr        :: ");
660     fprintf(pathload_fp,"  # of DS @ rcvr        :: ");
661     for(j=0;j<stream_cnt-1;j++)
662     {
663       if (Verbose) printf(":%2d",discard[j]);
664       fprintf(pathload_fp,":%2d",discard[j]);
665     }
666      if ( Verbose ) printf("\n");
667     fprintf(pathload_fp,"\n");
668 }
669 
sig_sigusr1()670 void sig_sigusr1()
671 {
672   return;
673 }
674 
sig_alrm()675 void sig_alrm()
676 {
677   terminate_gracefully(exp_start_time);
678   exit(0);
679 }
680 
ctrl_listen(void * arg)681 void *ctrl_listen(void *arg)
682 {
683   struct timeval select_tv;
684   fd_set readset;
685   l_int32 ret_val ;
686   char ctr_buff[8];
687 
688 #ifdef THRLIB
689   FD_ZERO(&readset);
690   FD_SET(sock_tcp,&readset);
691   select_tv.tv_sec=100;select_tv.tv_usec=0;
692   if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 )
693   {
694     /* check ... mesg received */
695     if ( FD_ISSET(sock_tcp,&readset) )
696     {
697       /* check the control connection.*/
698       if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
699       {
700         if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
701               ((ret_val & 0x7fffffff) == FINISHED_STREAM ) )
702         {
703           while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ;
704           if ( ret_val == ((thr_arg *)arg)->stream_cnt )
705           {
706             ((thr_arg *)arg)->finished_stream =1 ;
707             pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1);
708             pthread_exit(NULL);
709           }
710         }
711         else if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
712                   ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) )
713         {
714           select_tv.tv_usec = 2000 ;
715           select_tv.tv_sec = 0 ;
716           select(1,NULL,NULL,NULL,&select_tv);
717           ((thr_arg *)arg)->finished_stream =1 ;
718           pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1);
719           pthread_exit(NULL);
720         }
721       }
722     }
723   }
724 #endif
725   return NULL;
726 }
727 
get_trend(double owdfortd[],l_int32 pkt_cnt)728 void get_trend(double owdfortd[],l_int32 pkt_cnt )
729 {
730   double median_owd[MAX_STREAM_LEN];
731   l_int32 median_owd_len=0;
732   double ordered[MAX_STREAM_LEN];
733   l_int32 j,count,pkt_per_min;
734   //pkt_per_min = 5 ;
735   pkt_per_min = (int)floor(sqrt((double)pkt_cnt));
736   count = 0 ;
737   for ( j = 0 ; j < pkt_cnt  ; j=j+pkt_per_min )
738   {
739     if ( j+pkt_per_min >= pkt_cnt )
740       count = pkt_cnt - j ;
741     else
742       count = pkt_per_min;
743     order_dbl(owdfortd , ordered ,j,count ) ;
744     if ( count % 2 == 0 )
745        median_owd[median_owd_len++] =
746           ( ordered[(int)(count*.5) -1] + ordered[(int)(count*0.5)] )/2 ;
747     else
748        median_owd[median_owd_len++] =  ordered[(int)(count*0.5)] ;
749   }
750   pct_metric[trend_idx]=
751       pairwise_comparision_test(median_owd , 0 , median_owd_len );
752   pdt_metric[trend_idx]=
753       pairwise_diff_test(median_owd , 0, median_owd_len );
754   trend_idx+=1;
755 }
756 
757 /*
758   Order an array of doubles using bubblesort
759 */
order_dbl(double unord_arr[],double ord_arr[],l_int32 start,l_int32 num_elems)760 void order_dbl(double unord_arr[], double ord_arr[],l_int32 start, l_int32 num_elems)
761 {
762   l_int32 i,j,k;
763   double temp;
764   for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
765 
766   for (i=1;i<num_elems;i++)
767   {
768     for (j=i-1;j>=0;j--)
769       if (ord_arr[j+1] < ord_arr[j])
770       {
771         temp=ord_arr[j];
772         ord_arr[j]=ord_arr[j+1];
773         ord_arr[j+1]=temp;
774       }
775       else break;
776   }
777 }
778 
779 /*
780     Order an array of float using bubblesort
781 */
order_float(float unord_arr[],float ord_arr[],l_int32 start,l_int32 num_elems)782 void order_float(float unord_arr[], float ord_arr[],l_int32 start, l_int32 num_elems)
783 {
784   l_int32 i,j,k;
785   double temp;
786   for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
787   for (i=1;i<num_elems;i++)
788   {
789     for (j=i-1;j>=0;j--)
790       if (ord_arr[j+1] < ord_arr[j])
791       {
792         temp=ord_arr[j];
793         ord_arr[j]=ord_arr[j+1];
794         ord_arr[j+1]=temp;
795       }
796       else break;
797   }
798 }
799 
800 /*
801     Order an array of l_int32 using bubblesort
802 */
order_int(l_int32 unord_arr[],l_int32 ord_arr[],l_int32 num_elems)803 void order_int(l_int32 unord_arr[], l_int32 ord_arr[], l_int32 num_elems)
804 {
805     l_int32 i,j;
806     l_int32 temp;
807     for (i=0;i<num_elems;i++) ord_arr[i]=unord_arr[i];
808     for (i=1;i<num_elems;i++) {
809         for (j=i-1;j>=0;j--)
810             if (ord_arr[j+1] < ord_arr[j]) {
811                 temp=ord_arr[j];
812                 ord_arr[j]=ord_arr[j+1];
813                 ord_arr[j+1]=temp;
814             }
815             else break;
816     }
817 }
818 
819 /*
820     Send a message through the control stream
821 */
send_ctr_mesg(char * ctr_buff,l_int32 ctr_code)822 void send_ctr_mesg(char *ctr_buff, l_int32 ctr_code)
823 {
824     l_int32 ctr_code_n = htonl(ctr_code);
825     memcpy((void*)ctr_buff, &ctr_code_n, sizeof(l_int32));
826     if (write(sock_tcp, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
827     {
828         fprintf(stderr, "send control message failed:\n");
829         exit(-1);
830     }
831 }
832 
833 
834 /*
835   Receive message from the control stream
836 */
recv_ctr_mesg(l_int32 ctr_strm,char * ctr_buff)837 l_int32 recv_ctr_mesg(l_int32 ctr_strm, char *ctr_buff)
838 {
839   l_int32 ctr_code;
840   gettimeofday(&first_time,0);
841   if (read(ctr_strm, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
842     return(-1);
843   gettimeofday(&second_time,0);
844   memcpy(&ctr_code, ctr_buff, sizeof(l_int32));
845   return(ntohl(ctr_code));
846 }
847 
848 /*
849   Compute the time difference in microseconds
850   between two timeval measurements
851 */
time_to_us_delta(struct timeval tv1,struct timeval tv2)852 double time_to_us_delta(struct timeval tv1, struct timeval tv2)
853 {
854   double time_us;
855   time_us = (double)
856     ((tv2.tv_sec-tv1.tv_sec)*1000000+(tv2.tv_usec-tv1.tv_usec));
857   return time_us;
858 }
859 
860 /*
861   Compute the average of the set of measurements <data>.
862 */
get_avg(double data[],l_int32 num_values)863 double get_avg(double data[], l_int32 num_values)
864 {
865   l_int32 i;
866   double sum_;
867   sum_ = 0;
868   for (i=0; i<num_values; i++) sum_ += data[i];
869   return (sum_ / (double)num_values);
870 }
871 
872 /*
873     PCT test to detect increasing trend in stream
874 */
pairwise_comparision_test(double array[],l_int32 start,l_int32 end)875 double pairwise_comparision_test (double array[] ,l_int32 start , l_int32 end)
876 {
877   l_int32 improvement = 0 ,i ;
878   double total ;
879 
880   if ( ( end - start  ) >= MIN_PARTITIONED_STREAM_LEN )
881   {
882     for ( i = start ; i < end - 1   ; i++ )
883     {
884       if ( array[i] < array[i+1] )
885         improvement += 1 ;
886     }
887     total = ( end - start ) ;
888     return ( (double)improvement/total ) ;
889   }
890   else
891     return -1 ;
892 }
893 
894 /*
895     PDT test to detect increasing trend in stream
896 */
pairwise_diff_test(double array[],l_int32 start,l_int32 end)897 double pairwise_diff_test(double array[] ,l_int32 start , l_int32 end)
898 {
899   double y = 0 , y_abs = 0 ;
900   l_int32 i ;
901   if ( ( end - start  ) >= MIN_PARTITIONED_STREAM_LEN )
902   {
903     for ( i = start+1 ; i < end    ; i++ )
904     {
905       y += array[i] - array[i-1] ;
906       y_abs += fabs(array[i] - array[i-1]) ;
907     }
908     return y/y_abs ;
909   }
910   else
911     return 2. ;
912 }
913 
grey_bw_resolution()914 double grey_bw_resolution()
915 {
916   if ( adr )
917     return (.05*adr<12?.05*adr:12) ;
918   else
919     return min_rate ;
920 }
921 
922 /*
923     test if Rmax and Rmin range is smaller than
924     user specified bw resolution
925     or
926     if Gmin and Gmax range is smaller than grey
927     bw resolution.
928 */
converged()929 l_int32 converged()
930 {
931   int ret_val=0;
932   if ( (converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm  )
933     ret_val=1;
934   else if ( tr_max != 0 && tr_max != tr_min )
935   {
936     if ( tr_max - tr_min <= bw_resol )
937     {
938       converged_rmn_rmx=1;
939       ret_val=1;
940     }
941     else if( tr_max - grey_max <= grey_bw_resolution() &&
942               grey_min - tr_min <= grey_bw_resolution() )
943     {
944       converged_gmn_rmn = 1;
945       converged_gmx_rmx = 1;
946       ret_val=1;
947     }
948   }
949   return ret_val ;
950 }
951 
952 /*
953     Calculate next fleet rate
954     when fleet showed INCREASING trend.
955 */
radj_increasing()956 void radj_increasing()
957 {
958   if ( grey_max != 0 && grey_max >= tr_min )
959   {
960     if ( tr_max - grey_max <= grey_bw_resolution() )
961     {
962       converged_gmx_rmx = 1;
963       exp_flag=0;
964       if ( grey_min || tr_min )
965         radj_notrend() ;
966       else
967       {
968         if ( grey_min < grey_max )
969           tr = grey_min/2. ;
970         else
971           tr = grey_max/2. ;
972       }
973     }
974     else
975      tr = ( tr_max + grey_max)/2. ;
976   }
977   else
978     tr =  (tr_max + tr_min)/2.<min_rate?min_rate:(tr_min+tr_max)/2. ;
979 }
980 
981 
982 /*
983     Calculate next fleet rate
984     when fleet showed NOTREND trend.
985 */
radj_notrend()986 void radj_notrend()
987 {
988   if ( exp_flag )
989      tr =  2*tr>max_rate?max_rate:2*tr ;
990   else
991   {
992     if ( grey_min != 0 && grey_min <= tr_max )
993     {
994       if ( grey_min - tr_min <= grey_bw_resolution() )
995       {
996         converged_gmn_rmn = 1;
997         radj_increasing() ;
998       }
999       else
1000         tr =  (tr_min+grey_min)/2.<min_rate?min_rate:(tr_min+grey_min)/2. ;
1001     }
1002     else
1003       tr =  (tr_max + tr_min)/2.<min_rate?min_rate:(tr_min+tr_max)/2. ;
1004   }
1005 }
1006 
1007 
1008 /*
1009     Calculate next fleet rate
1010     when fleet showed GREY trend.
1011 */
radj_greymax()1012 void radj_greymax()
1013 {
1014   if ( tr_max == 0 )
1015     tr = (tr+.5*tr)<max_rate?(tr+.5*tr):max_rate ;
1016   else if ( tr_max - grey_max <= grey_bw_resolution() )
1017   {
1018     converged_gmx_rmx = 1;
1019     radj_greymin() ;
1020   }
1021   else
1022     tr = ( tr_max + grey_max)/2. ;
1023 }
1024 
1025 /*
1026     Calculate next fleet rate
1027     when fleet showed GREY trend.
1028 */
radj_greymin()1029 void radj_greymin()
1030 {
1031  if ( grey_min - tr_min <= grey_bw_resolution() )
1032  {
1033    converged_gmn_rmn = 1;
1034    radj_greymax() ;
1035  }
1036  else
1037    tr = (tr_min+grey_min)/2.<min_rate?min_rate:(tr_min+grey_min)/2. ;
1038 }
1039 
1040 
1041 /*
1042     dpending upon trend in fleet :-
1043     - update the state variables.
1044     - decide the next fleet rate
1045     return -1 when converged
1046 */
rate_adjustment(l_int32 flag)1047 l_int32 rate_adjustment(l_int32 flag)
1048 {
1049   l_int32 ret_val = 0 ;
1050   if( flag == INCREASING )
1051   {
1052     if ( max_rate_flag)
1053       max_rate_flag=0;
1054     if ( grey_max >= tr )
1055       grey_max = grey_min = 0 ;
1056     tr_max = tr ;
1057     if (!converged_gmx_rmx_tm )
1058     {
1059       if ( !converged() )
1060         radj_increasing() ;
1061       else
1062         ret_val=-1 ; //return -1;
1063     }
1064     else
1065     {
1066       exp_flag = 0 ;
1067       if ( !converged() )
1068         radj_notrend() ;
1069     }
1070   }
1071   else if ( flag == NOTREND )
1072   {
1073     if ( grey_min < tr )
1074       grey_min = 0 ;
1075     if ( grey_max < tr )
1076       grey_max = grey_min = 0 ;
1077     if ( tr > tr_min )
1078       tr_min =  tr ;
1079     if ( !converged_gmn_rmn_tm && !converged() )
1080       radj_notrend() ;
1081     else
1082       ret_val=-1 ; //return -1 ;
1083   }
1084   else if ( flag == GREY )
1085   {
1086     if ( grey_max == 0 && grey_min == 0 )
1087       grey_max =  grey_min = tr ;
1088     if (tr==grey_max || tr>grey_max )
1089     {
1090       grey_max = tr ;
1091       if ( !converged_gmx_rmx_tm )
1092       {
1093         if ( !converged() )
1094           radj_greymax() ;
1095         else
1096           ret_val=-1 ; //return -1 ;
1097       }
1098       else
1099       {
1100         exp_flag = 0 ;
1101         if ( !converged() )
1102           radj_notrend() ;
1103         else
1104           ret_val=-1;
1105       }
1106     }
1107     else if ( tr < grey_min || grey_min == 0  )
1108     {
1109       grey_min = tr ;
1110       if ( !converged() )
1111         radj_greymin() ;
1112       else
1113         ret_val=-1 ; //return -1 ;
1114     }
1115   }
1116 
1117   if (Verbose)
1118   {
1119     printf("  Rmin-Rmax             :: %.2f-%.2fMbps\n",tr_min,tr_max);
1120     printf("  Gmin-Gmax             :: %.2f-%.2fMbps\n",grey_min,grey_max);
1121   }
1122   fprintf(pathload_fp,"  Rmin-Rmax             :: %.2f-%.2fMbps\n",tr_min,tr_max);
1123   fprintf(pathload_fp,"  Gmin-Gmax             :: %.2f-%.2fMbps\n",grey_min,grey_max);
1124 
1125   if ( ret_val == -1 )
1126     return -1 ;
1127   if ( tr >= max_rate )
1128     max_rate_flag++ ;
1129 
1130   if ( max_rate_flag > 1 )
1131      return -1 ;
1132   if ( min_rate_flag > 1 )
1133      return -1 ;
1134   transmission_rate = (l_int32) rint(1000000 * tr ) ;
1135   return 0 ;
1136 }
1137 
1138 /*
1139   calculates fleet param L,T .
1140   calc_param returns -1, if we have
1141   reached to upper/lower limits of the
1142   stream parameters like L,T .
1143   otherwise returns 0 .
1144 */
calc_param()1145 l_int32 calc_param()
1146 {
1147   double tmp_tr ;
1148   l_int32 tmp ;
1149   l_int32 tmp_time_interval;
1150   if (tr < 150 )
1151   {
1152     time_interval  = 80>min_time_interval?80:min_time_interval ;
1153     cur_pkt_sz=rint(tr*time_interval/8.) - 28;
1154     if ( cur_pkt_sz < MIN_PKT_SZ )
1155     {
1156       cur_pkt_sz = MIN_PKT_SZ ;
1157       time_interval =rint ((cur_pkt_sz + 28)*8./tr) ;
1158       tr = ( cur_pkt_sz + 28 )*8. /time_interval ;
1159     }
1160     else if ( cur_pkt_sz > max_pkt_sz )
1161     {
1162       cur_pkt_sz = max_pkt_sz;
1163       time_interval = min_time_interval ;
1164       tmp_tr = ( cur_pkt_sz + 28 )*8. /time_interval ;
1165       if ( equal(tr,tmp_tr))
1166           tr = tmp_tr;
1167       else
1168         return -1 ;
1169     }
1170   }
1171   else if ( tr < 600 )
1172   {
1173     tmp_tr = tr ;
1174     tmp_time_interval = rint(( max_pkt_sz + 28 )* 8 / tr) ;
1175     if ( cur_pkt_sz == max_pkt_sz && tmp_time_interval == time_interval )
1176       return -1 ;
1177     time_interval = tmp_time_interval ;
1178     tmp=rint(tr*time_interval/8.)-28;
1179     cur_pkt_sz=tmp<max_pkt_sz?tmp:max_pkt_sz;
1180     tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
1181     if ((tr_min && (equal(tr,tr_min) || tr<tr_min))
1182         || (grey_max && tmp_tr>grey_max && (equal(tr,grey_max) || tr<grey_max)))
1183     {
1184       do
1185       {
1186         --time_interval;
1187         cur_pkt_sz=rint(tr*time_interval/8.)-28;
1188       }while (cur_pkt_sz > max_pkt_sz);
1189       tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
1190     }
1191   }
1192   else
1193   {
1194     cur_pkt_sz = max_pkt_sz ;
1195     time_interval = rint(( cur_pkt_sz + 28 )* 8 / tr) ;
1196     tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
1197     if ((tr_min && (equal(tr,tr_min) || tr<tr_min)) )
1198     {
1199       return -1 ;
1200     }
1201     if( equal(tr,tr_max) )
1202     {
1203       tr_max = tr ;
1204       if ( grey_max )
1205       {
1206         converged_gmx_rmx_tm=1;
1207         if ( !converged_gmn_rmn && !converged_gmn_rmn_tm )
1208           radj_notrend();
1209         else return -1 ;
1210       }
1211       else
1212       {
1213         converged_rmn_rmx=1;
1214         return -1 ;
1215       }
1216     }
1217   }
1218   return 0 ;
1219 }
1220 
1221 /*
1222   splits stream iff sender sent packets more than
1223   time_interval+1000 usec apart.
1224 */
eliminate_sndr_side_CS(double sndr_time_stamp[],l_int32 split_owd[])1225 l_int32 eliminate_sndr_side_CS (double sndr_time_stamp[], l_int32 split_owd[])
1226 {
1227   l_int32 j = 0,k=0;
1228   l_int32 cs_threshold;
1229 
1230   cs_threshold = 2*time_interval>time_interval+1000?2*time_interval:time_interval+1000;
1231   for ( k = 0 ; k < stream_len-1 ; k++ )
1232   {
1233     if ( sndr_time_stamp[k] == 0 || sndr_time_stamp[k+1] == 0 )
1234        continue;
1235     else if ((sndr_time_stamp[k+1]-sndr_time_stamp[k]) > cs_threshold)
1236       split_owd[j++] = k;
1237   }
1238   return j ;
1239 }
1240 
1241 /*
1242   discards owd of packets received when
1243   receiver was NOT running.
1244 */
eliminate_rcvr_side_CS(double rcvr_time_stamp[],double owd[],double owdfortd[],l_int32 low,l_int32 high,l_int32 * num_rcvr_cs,l_int32 * tmp_b2b)1245 l_int32 eliminate_rcvr_side_CS ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b )
1246 {
1247   l_int32 b2b_pkt[MAX_STREAM_LEN] ;
1248   l_int32 i,k=0 ;
1249   l_int32 len=0;
1250   l_int32 min_gap;
1251 
1252   min_gap = MIN_TIME_INTERVAL > 1.5*rcv_latency ? MIN_TIME_INTERVAL :2.5*rcv_latency ;
1253   for ( i = low ; i <= high  ; i++ )
1254   {
1255     if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 )
1256       continue ;
1257     else if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])> min_gap)
1258       owdfortd[len++] = owd[i];
1259     else
1260       b2b_pkt[k++] = i ;
1261   }
1262 
1263   /* go through discarded list and count b2b discards as 1 CS instance */
1264   for (i=1;i<k;i++)
1265     if ( b2b_pkt[i]-b2b_pkt[i-1] != 1)
1266       (*num_rcvr_cs)++;
1267   *tmp_b2b += k;
1268   return len ;
1269 }
1270 
1271 /* eliminates packets received b2b due to IC */
eliminate_b2b_pkt_ic(double rcvr_time_stamp[],double owd[],double owdfortd[],l_int32 low,l_int32 high,l_int32 * num_rcvr_cs,l_int32 * tmp_b2b)1272 l_int32 eliminate_b2b_pkt_ic ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b )
1273 {
1274   l_int32 b2b_pkt[MAX_STREAM_LEN] ;
1275   l_int32 i,k=0 ;
1276   l_int32 len=0;
1277   l_int32 min_gap;
1278   l_int32 tmp=0;
1279 
1280   min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL :3*rcv_latency ;
1281   for ( i = low ; i <= high  ; i++ )
1282   {
1283     if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 )
1284       continue ;
1285 
1286     //fprintf(stderr,"i %d  owd %.2f dispersion %.2f",i, owd[i],rcvr_time_stamp[i+1]- rcvr_time_stamp[i]);
1287     if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])< min_gap)
1288     {
1289       b2b_pkt[k++] = i ;
1290       tmp++;
1291       //fprintf(stderr," b\n");
1292     }
1293     else
1294     {
1295       if ( tmp >= 3 )
1296       {
1297         //fprintf(stderr," j\n");
1298         tmp=0;
1299         owdfortd[len++] = owd[i];
1300       }
1301     }
1302   }
1303   return len ;
1304 }
1305 
1306 /* Adjust offset to zero again  */
adjust_offset_to_zero(double owd[],l_int32 len)1307 void adjust_offset_to_zero(double owd[], l_int32 len)
1308 {
1309     l_int32 owd_min = 0;
1310     l_int32 i ;
1311     for (i=0; i< len; i++) {
1312         if ( owd_min == 0 && owd[i] != 0 ) owd_min=owd[i];
1313         else if (owd_min != 0 && owd[i] != 0 && owd[i]<owd_min) owd_min=owd[i];
1314     }
1315 
1316     for (i=0; i< len; i++) {
1317         if ( owd[i] != 0 )
1318             owd[i] -= owd_min;
1319     }
1320 }
1321 
1322 #define INCR    1
1323 #define NOTR    2
1324 #define DISCARD 3
1325 #define UNCL    4
get_pct_trend(double pct_metric[],l_int32 pct_trend[],l_int32 pct_result_cnt)1326 void get_pct_trend(double pct_metric[], l_int32 pct_trend[], l_int32 pct_result_cnt )
1327 {
1328  l_int32 i ;
1329  for (i=0; i < pct_result_cnt;i++ )
1330  {
1331    pct_trend[i] = UNCL ;
1332    if ( pct_metric[i] == -1  )
1333    {
1334      if (Verbose)
1335        printf("d");
1336      fprintf(pathload_fp,"d");
1337      pct_trend[i] = DISCARD ;
1338    }
1339    else if ( pct_metric[i] > 1.1 * PCT_THRESHOLD )
1340    {
1341      if (Verbose)
1342        printf("I");
1343      fprintf(pathload_fp,"I");
1344      pct_trend[i] = INCR ;
1345    }
1346    else if ( pct_metric[i] < .9 * PCT_THRESHOLD )
1347    {
1348      if (Verbose)
1349        printf("N");
1350      fprintf(pathload_fp,"N");
1351      pct_trend[i] = NOTR ;
1352    }
1353    else if(pct_metric[i] <= PCT_THRESHOLD*1.1 && pct_metric[i] >= PCT_THRESHOLD*.9 )
1354    {
1355      if (Verbose)
1356        printf("U");
1357      fprintf(pathload_fp,"U");
1358      pct_trend[i] = UNCL ;
1359    }
1360  }
1361  if (Verbose)
1362    printf("\n");
1363  fprintf(pathload_fp,"\n");
1364 }
1365 
get_pdt_trend(double pdt_metric[],l_int32 pdt_trend[],l_int32 pdt_result_cnt)1366 void get_pdt_trend(double pdt_metric[], l_int32 pdt_trend[], l_int32 pdt_result_cnt )
1367 {
1368  l_int32 i ;
1369  for (i=0; i < pdt_result_cnt;i++ )
1370  {
1371     if ( pdt_metric[i] == 2  )
1372     {
1373  if (Verbose)
1374         printf("d");
1375         fprintf(pathload_fp,"d");
1376         pdt_trend[i] = DISCARD ;
1377     }
1378     else if ( pdt_metric[i] > 1.1 * PDT_THRESHOLD )
1379     {
1380  if (Verbose)
1381         printf("I");
1382         fprintf(pathload_fp,"I");
1383         pdt_trend[i] = INCR ;
1384     }
1385     else if ( pdt_metric[i] < .9 * PDT_THRESHOLD )
1386     {
1387  if (Verbose)
1388         printf("N");
1389         fprintf(pathload_fp,"N");
1390         pdt_trend[i] = NOTR ;
1391     }
1392     else if ( pdt_metric[i] <= PDT_THRESHOLD*1.1 && pdt_metric[i] >= PDT_THRESHOLD*.9 )
1393     {
1394  if (Verbose)
1395         printf("U");
1396         fprintf(pathload_fp,"U");
1397         pdt_trend[i] = UNCL ;
1398     }
1399  }
1400  if (Verbose)
1401  printf("\n");
1402  fprintf(pathload_fp,"\n");
1403 }
1404 
1405 /*
1406   returns : trend in fleet or -1 if more than 50% of stream were discarded
1407 */
aggregate_trend_result()1408 l_int32 aggregate_trend_result()
1409 {
1410   l_int32 total=0 ,i_cnt = 0, n_cnt = 0;
1411   l_int32 num_dscrd_strm=0;
1412   l_int32 i=0;
1413   l_int32 pct_trend[TREND_ARRAY_LEN] , pdt_trend[TREND_ARRAY_LEN] ;
1414 
1415   if (Verbose)
1416     printf("  PCT metric/stream[%2d] :: ",trend_idx);
1417   fprintf(pathload_fp,"  PCT metric/stream[%2d] :: ",trend_idx);
1418   for (i=0; i < trend_idx;i++ )
1419   {
1420     if (Verbose)
1421       printf("%3.2f:",pct_metric[i]);
1422     fprintf(pathload_fp,"%3.2f:",pct_metric[i]);
1423   }
1424   if (Verbose)
1425     printf("\n");
1426   fprintf(pathload_fp,"\n");
1427   if (Verbose)
1428     printf("  PDT metric/stream[%2d] :: ",trend_idx);
1429   fprintf(pathload_fp,"  PDT metric/stream[%2d] :: ",trend_idx);
1430   for (i=0; i < trend_idx;i++ )
1431   {
1432     if (Verbose)
1433       printf("%3.2f:",pdt_metric[i]);
1434     fprintf(pathload_fp,"%3.2f:",pdt_metric[i]);
1435   }
1436   if (Verbose)
1437     printf("\n");
1438   fprintf(pathload_fp,"\n");
1439   if (Verbose)
1440     printf("  PCT Trend/stream [%2d] :: ",trend_idx);
1441   fprintf(pathload_fp,"  PCT Trend/stream [%2d] :: ",trend_idx);
1442   get_pct_trend(pct_metric,pct_trend,trend_idx);
1443   if (Verbose)
1444     printf("  PDT Trend/stream [%2d] :: ",trend_idx);
1445   fprintf(pathload_fp,"  PDT Trend/stream [%2d] :: ",trend_idx);
1446   get_pdt_trend(pdt_metric,pdt_trend,trend_idx);
1447 
1448   if (Verbose)
1449     printf("  Trend per stream [%2d] :: ",trend_idx);
1450   fprintf(pathload_fp,"  Trend per stream [%2d] :: ",trend_idx);
1451   for (i=0; i < trend_idx;i++ )
1452   {
1453     if ( pct_trend[i] == DISCARD || pdt_trend[i] == DISCARD )
1454     {
1455        if (Verbose)
1456          printf("d");
1457        fprintf(pathload_fp,"d");
1458        num_dscrd_strm++ ;
1459     }
1460     else if ( pct_trend[i] == INCR &&  pdt_trend[i] == INCR )
1461     {
1462        if (Verbose)
1463          printf("I");
1464        fprintf(pathload_fp,"I");
1465        i_cnt++;
1466     }
1467     else if ( pct_trend[i] == NOTR && pdt_trend[i] == NOTR )
1468     {
1469        if (Verbose)
1470          printf("N");
1471        fprintf(pathload_fp,"N");
1472        n_cnt++;
1473     }
1474     else if ( pct_trend[i] == INCR && pdt_trend[i] == UNCL )
1475     {
1476        if (Verbose)
1477          printf("I");
1478        fprintf(pathload_fp,"I");
1479        i_cnt++;
1480     }
1481     else if ( pct_trend[i] == NOTR && pdt_trend[i] == UNCL )
1482     {
1483        if (Verbose)
1484          printf("N");
1485        fprintf(pathload_fp,"N");
1486        n_cnt++;
1487     }
1488     else if ( pdt_trend[i] == INCR && pct_trend[i] == UNCL )
1489     {
1490        if (Verbose)
1491          printf("I");
1492        fprintf(pathload_fp,"I");
1493        i_cnt++;
1494     }
1495     else if ( pdt_trend[i] == NOTR && pct_trend[i] == UNCL )
1496     {
1497        if (Verbose)
1498          printf("N");
1499        fprintf(pathload_fp,"N");
1500        n_cnt++ ;
1501     }
1502     else
1503     {
1504        if (Verbose)
1505          printf("U");
1506        fprintf(pathload_fp,"U");
1507     }
1508     total++ ;
1509   }
1510   if (Verbose) printf("\n");
1511   fprintf(pathload_fp,"\n");
1512 
1513   /* check whether number of usable streams is
1514      atleast 50% of requested number of streams */
1515   total-=num_dscrd_strm ;
1516   if ( total < num_stream/2 && !slow && !interrupt_coalescence)
1517   {
1518     bad_fleet_cs = 1 ;
1519     retry_fleet_cnt_cs++  ;
1520     return -1 ;
1521   }
1522   else
1523   {
1524     bad_fleet_cs = 0 ;
1525     retry_fleet_cnt_cs=0;
1526   }
1527 
1528   if( (double)i_cnt/(total) >= AGGREGATE_THRESHOLD )
1529   {
1530     if (Verbose)
1531       printf("  Aggregate trend       :: INCREASING\n");
1532     fprintf(pathload_fp,"  Aggregate trend       :: INCREASING\n");
1533     return INCREASING ;
1534   }
1535   else if( (double)n_cnt/(total) >= AGGREGATE_THRESHOLD )
1536   {
1537     if (Verbose)
1538       printf("  Aggregate trend       :: NO TREND\n");
1539     fprintf(pathload_fp,"  Aggregate trend       :: NO TREND\n");
1540     return NOTREND ;
1541   }
1542   else
1543   {
1544     if (Verbose)
1545       printf("  Aggregate trend       :: GREY\n");
1546     fprintf(pathload_fp,"  Aggregate trend       :: GREY\n");
1547     return GREY ;
1548   }
1549 }
1550 
get_sndr_time_interval(double snd_time[],double * sum)1551 l_int32 get_sndr_time_interval(double snd_time[],double *sum)
1552 {
1553   l_int32 k,j=0,new_j=0;
1554   double ordered[MAX_STREAM_LEN] ;
1555   double ltime_interval[MAX_STREAM_LEN] ;
1556   for ( k = 0; k < stream_len-1; k++ )
1557   {
1558     if ( snd_time[k] == 0 || snd_time[k+1] == 0 )
1559       continue;
1560     else
1561       ltime_interval[j++] = snd_time[k+1] - snd_time[k] ;
1562   }
1563   order_dbl(ltime_interval, ordered , 0, j ) ;
1564   /* discard the top 15% as outliers  */
1565   new_j = j - rint(j*.15) ;
1566   for ( k = 0 ; k < new_j ; k++ )
1567     *sum += ordered[k] ;
1568   return new_j ;
1569 }
1570 
get_sending_rate()1571 void get_sending_rate()
1572 {
1573  time_interval = snd_time_interval/num;
1574  cur_req_rate = tr ;
1575  cur_actual_rate = (28 + cur_pkt_sz) * 8. / time_interval ;
1576 
1577  if( !equal(cur_req_rate, cur_actual_rate)  )
1578  {
1579    if( !grey_max && !grey_min )
1580    {
1581      if( tr_min && tr_max && (less_than(cur_actual_rate,tr_min)||equal(cur_actual_rate,tr_min)))
1582        converged_rmn_rmx_tm = 1;
1583      if( tr_min && tr_max && (less_than(tr_max,cur_actual_rate)||equal(tr_max,cur_actual_rate)))
1584        converged_rmn_rmx_tm = 1;
1585 
1586    }
1587    else if ( cur_req_rate < tr_max && cur_req_rate > grey_max )
1588    {
1589      if( !(less_than(cur_actual_rate,tr_max)&&grtr_than(cur_actual_rate,grey_max)) )
1590        converged_gmx_rmx_tm = 1;
1591    }
1592    else if ( cur_req_rate < grey_min && cur_req_rate > tr_min )
1593    {
1594      if( !(less_than(cur_actual_rate,grey_min) && grtr_than(cur_actual_rate,tr_min)) )
1595         converged_gmn_rmn_tm = 1;
1596    }
1597   }
1598 
1599   tr = cur_actual_rate ;
1600   transmission_rate = (l_int32) rint(1000000 * tr) ;
1601   if(Verbose)
1602     printf("  Fleet Parameter(act)  :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate, cur_pkt_sz , stream_len,time_interval);
1603   fprintf(pathload_fp,"  Fleet Parameter(act)  :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate,cur_pkt_sz,stream_len,time_interval);
1604   snd_time_interval=0;
1605   num=0;
1606 }
1607 
terminate_gracefully(struct timeval exp_start_time)1608 void terminate_gracefully(struct timeval exp_start_time)
1609 {
1610   l_int32 ctr_code;
1611   char ctr_buff[8],buff[26];
1612   struct timeval exp_end_time;
1613   double min=0,max=0 ;
1614 
1615   ctr_code = TERMINATE | CTR_CODE;
1616   send_ctr_mesg(ctr_buff, ctr_code);
1617 
1618   gettimeofday(&exp_end_time, NULL);
1619   strncpy(buff, ctime(&(exp_end_time.tv_sec)), 24);
1620   buff[24] = '\0';
1621   if (verbose || Verbose)
1622     printf("\n\t*****  RESULT *****\n");
1623   fprintf(pathload_fp,"\n\t*****  RESULT *****\n");
1624 
1625   if (netlog)
1626     netlogger() ;
1627 
1628   if ( min_rate_flag )
1629   {
1630     if (verbose || Verbose)
1631     {
1632       printf("Avail-bw < minimum sending rate.\n");
1633       printf("Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n");
1634     }
1635     fprintf(pathload_fp,"Avail-bw < minimum sending rate.\n");
1636     fprintf(pathload_fp,"Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n");
1637   }
1638   else if ( max_rate_flag && !interrupt_coalescence )
1639   {
1640     if (verbose || Verbose)
1641     {
1642       printf("Avail-bw > maximum sending rate.\n");
1643       if ( tr_min)
1644         printf("Avail-bw > %.2f (Mbps)\n", tr_min);
1645     }
1646     fprintf(pathload_fp,"Avail-bw > maximum sending rate.\n");
1647     if ( tr_min)
1648       fprintf(pathload_fp,"Avail-bw > %.2f (Mbps)\n", tr_min);
1649   }
1650   else if (bad_fleet_cs && !interrupt_coalescence)
1651   {
1652     if (verbose || Verbose)
1653       printf("Measurement terminated due to frequent CS @ sender/receiver.\n");
1654     fprintf(pathload_fp,"Measurement terminated due to frequent CS @ sender/receiver.\n");
1655     if ((tr_min&& tr_max) || (grey_min&&grey_max))
1656     {
1657       if ( grey_min&& grey_max)
1658       {
1659         min = grey_min ; max = grey_max ;
1660       }
1661       else
1662       {
1663         min = tr_min ;max = tr_max ;
1664       }
1665       if (verbose || Verbose)
1666       {
1667         printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1668         printf("Measurements finished at %s \n",  buff);
1669         printf("Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
1670       }
1671       fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1672       fprintf(pathload_fp,"Measurements finished at %s \n",  buff);
1673       fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
1674     }
1675   }
1676   else
1677   {
1678     if ( !interrupt_coalescence && ((converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm ))
1679     {
1680       if (Verbose)
1681         printf("Actual probing rate != desired probing rate.\n");
1682       fprintf(pathload_fp,"Actual probing rate != desired probing rate.\n");
1683       if ( converged_rmn_rmx_tm )
1684       {
1685         min = tr_min ; max = tr_max;
1686       }
1687       else
1688       {
1689         min = grey_min ; max = grey_max ;
1690       }
1691     }
1692     else if ( !interrupt_coalescence && converged_rmn_rmx )
1693     {
1694       if (Verbose)
1695         printf("User specified bandwidth resolution achieved\n");
1696       fprintf(pathload_fp,"User specified bandwidth resolution achieved\n");
1697       min = tr_min ; max = tr_max ;
1698     }
1699     else if ( !interrupt_coalescence && converged_gmn_rmn && converged_gmx_rmx )
1700     {
1701       if (Verbose)
1702         printf("Exiting due to grey bw resolution\n");
1703       fprintf(pathload_fp,"Exiting due to grey bw resolution\n");
1704       min = grey_min ; max = grey_max;
1705     }
1706     else
1707     {
1708       min = tr_min ; max = tr_max;
1709     }
1710 
1711     if (verbose||Verbose)
1712     {
1713       if ( lower_bound)
1714       {
1715         printf("Receiver NIC has interrupt coalescence enabled\n");
1716         printf("Available bandwidth is greater than %.2f (Mbps)\n", min);
1717       }
1718       else
1719         printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1720 
1721       printf("Measurements finished at %s \n",  buff);
1722       printf("Measurement latency is %.2f sec \n",time_to_us_delta(exp_start_time,exp_end_time)/1000000);
1723     }
1724 
1725     if ( lower_bound)
1726     {
1727       fprintf(pathload_fp,"Receiver NIC has interrupt coalescence enabled\n");
1728       fprintf(pathload_fp,"Available bandwidth is greater than %.2f (Mbps)\n", min);
1729     }
1730     else
1731       fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1732 
1733     fprintf(pathload_fp,"Measurements finished at %s \n",  buff);
1734     fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
1735   }
1736 
1737   if (netlog)
1738     fclose(netlog_fp);
1739   fclose(pathload_fp);
1740   close(sock_tcp);
1741   exit(0);
1742 }
1743 
netlogger()1744 void netlogger()
1745 {
1746     struct tm *tm;
1747     struct hostent *rcv_host, *snd_host;
1748     char rcv_name[256];
1749     struct timeval curr_time;
1750 
1751     gettimeofday(&curr_time,NULL);
1752     tm = gmtime(&curr_time.tv_sec);
1753     fprintf(netlog_fp,"DATE=%4d",tm->tm_year+1900);
1754     print_time(netlog_fp,tm->tm_mon+1);
1755     print_time(netlog_fp,tm->tm_mday);
1756     print_time(netlog_fp,tm->tm_hour);
1757     print_time(netlog_fp,tm->tm_min);
1758     if (tm->tm_sec <10) {
1759       fprintf(netlog_fp,"0");
1760       fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0);
1761     }
1762     else{
1763       fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0);
1764     }
1765     gethostname(rcv_name, 255);
1766     rcv_host = gethostbyname(rcv_name);
1767     if(strcmp(rcv_name, "\0")!=0) fprintf(netlog_fp," HOST=%s",rcv_host->h_name);
1768     else fprintf(netlog_fp," HOST=NO_NAME");
1769     fprintf(netlog_fp," PROG=pathload");
1770     fprintf(netlog_fp," LVL=Usage");
1771     if ((snd_host = gethostbyname(hostname)) == 0) {
1772         snd_host = gethostbyaddr(hostname,256,AF_INET);
1773     }
1774     fprintf(netlog_fp," PATHLOAD.SNDR=%s",snd_host->h_name);
1775     fprintf(netlog_fp," PATHLOAD.ABWL=%.1fMbps",tr_min);
1776     fprintf(netlog_fp," PATHLOAD.ABWH=%.1fMbps\n",tr_max);
1777     fclose(netlog_fp);
1778 }
1779 
1780 /* prl_int32 time */
print_time(FILE * fp,l_int32 time)1781 void print_time(FILE *fp, l_int32 time)
1782 {
1783   if( time<10){
1784     fprintf(fp,"0");
1785     fprintf(fp,"%1d",time);
1786   }
1787   else{
1788     fprintf(fp,"%2d",time);
1789   }
1790 }
1791 
less_than(double a,double b)1792 l_int32 less_than(double a, double b)
1793 {
1794   if ( !equal(a,b) && a < b)
1795     return 1;
1796   else
1797     return 0;
1798 }
1799 
grtr_than(double a,double b)1800 l_int32 grtr_than(double a, double b)
1801 {
1802   if ( !equal(a,b) && a > b)
1803     return 1;
1804   else
1805     return 0;
1806 }
1807 
1808 /*
1809    if a approx-equal b, return 1
1810    else 0
1811 */
equal(double a,double b)1812 l_int32 equal(double a , double b)
1813 {
1814   l_int32 maxdiff ;
1815   if ( a<b?a:b < 500 ) maxdiff = 2.5 ;
1816   else maxdiff = 5 ;
1817   if ( abs( a - b ) / b <= .02  && abs(a-b) < maxdiff )
1818     return 1 ;
1819   else
1820     return 0;
1821 }
1822 
1823 
1824 /*
1825  *  Help
1826  *  */
help()1827 void help()
1828 {
1829   fprintf(stderr, "usage: pathload_rcv [-q|-v] [-o|-O <filename>] [-N <filename>]\
1830 [-w <bw_resol>] [-h|-H] -s <sender>\n");
1831   fprintf (stderr,"-s        : hostname/ipaddress of sender\n");
1832   fprintf (stderr,"-q        : quite mode\n");
1833   fprintf (stderr,"-v        : verbose mode\n");
1834   fprintf (stderr,"-w        : user specified bw resolution\n");
1835   fprintf (stderr,"-o <file> : write log in user specified file [default is pathload.log]\n");
1836   fprintf (stderr,"-O <file> : append log in user specified file [default is pathload.log]\n");
1837   fprintf (stderr,"-N <file> : print output in netlogger format to <file>\n");
1838   fprintf (stderr,"-h|H      : print this help and exit\n");
1839   exit(0);
1840 }
1841