1 /*
2 This file is part of pathload.
3
4 pathload is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
8
9 pathload is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with pathload; if not, write to the Free Software
16 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18
19 /*-------------------------------------------------
20 pathload : an end-to-end available bandwidth
21 estimation tool
22 Author : Manish Jain ( jain@cc.gatech.edu )
23 Constantinos Dovrolis (dovrolis@cc.gatech.edu )
24 Release : Ver 1.3.2
25 Support : This work was supported by the SciDAC
26 program of the US department
27 --------------------------------------------------*/
28
29 /*
30 * $Header: /net/cvs/bwtest/pathload/pathload_rcv_func.c,v 1.294 2006/05/19 20:21:15 jain Exp $
31 */
32
33 #include "pathload_gbls.h"
34 #include "pathload_rcv.h"
35
recvfrom_latency(struct sockaddr_in rcv_udp_addr)36 l_int32 recvfrom_latency(struct sockaddr_in rcv_udp_addr)
37 {
38 char *random_data;
39 float min_OSdelta[50], ord_min_OSdelta[50];
40 l_int32 j ;
41 struct timeval current_time, first_time ;
42
43 if ( (random_data = malloc(max_pkt_sz*sizeof(char)) ) == NULL )
44 {
45 printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz);
46 exit(-1);
47 }
48 srandom(getpid()); /* Create random payload; does it matter? */
49 for (j=0; j<max_pkt_sz-1; j++) random_data[j]=(char)(random()&0x000000ff);
50
51 for (j=0; j<50; j++)
52 {
53 if ( sendto(sock_udp, random_data, max_pkt_sz, 0,
54 (struct sockaddr*)&rcv_udp_addr,sizeof(rcv_udp_addr)) == -1)
55 perror("recvfrom_latency");
56 gettimeofday(&first_time, NULL);
57 recvfrom(sock_udp, random_data, max_pkt_sz, 0, NULL, NULL);
58 gettimeofday(¤t_time, NULL);
59 min_OSdelta[j]= time_to_us_delta(first_time, current_time);
60 }
61 /* Use median of measured latencies to avoid outliers */
62 order_float(min_OSdelta, ord_min_OSdelta,0,50);
63 free(random_data);
64 return((l_int32)ord_min_OSdelta[25]);
65 }
66
67
get_adr()68 double get_adr()
69 {
70 struct timeval select_tv,arrv_tv[MAX_STREAM_LEN] ;
71 double delta ;
72 double bw_msr = 0;
73 double bad_bw_msr[10] ;
74 int num_bad_train=0 ;
75 int first = 1 ;
76 double sum =0 ;
77 l_int32 exp_train_id ;
78 l_int32 bad_train = 1;
79 l_int32 retry = 0 ;
80 l_int32 ctr_code ;
81 l_int32 ctr_msg_rcvd ;
82 l_int32 train_len=0;
83 l_int32 last=0,i;
84 l_int32 spacecnt=24 ;
85 char ctr_buff[8];
86 l_int32 num_burst;
87
88 if (Verbose)
89 printf(" ADR [");
90 fflush(stdout);
91 fprintf(pathload_fp," ADR [");
92 fflush(pathload_fp);
93 ctr_code = SEND_TRAIN | CTR_CODE ;
94 send_ctr_mesg(ctr_buff, ctr_code);
95 exp_train_id = 0 ;
96 for(i=0;i<100;i++)
97 {
98 arrv_tv[i].tv_sec=0;
99 arrv_tv[i].tv_usec=0;
100 }
101 while ( retry < MAX_TRAIN && bad_train )
102 {
103 if ( train_len == 5)
104 train_len = 3;
105 else
106 train_len = TRAIN_LEN - exp_train_id*15;
107 if (Verbose)
108 printf(".");
109 fflush(stdout);
110 fprintf(pathload_fp,".");
111 spacecnt--;
112 ctr_msg_rcvd = 0 ;
113 bad_train = recv_train(exp_train_id, arrv_tv, train_len);
114 /* Compute dispersion and bandwidth measurement */
115 if (!bad_train)
116 {
117 num_burst=0;
118 interrupt_coalescence=check_intr_coalescence(arrv_tv,train_len,&num_burst);
119 last=train_len;
120 while(!arrv_tv[last].tv_sec) --last;
121 delta = time_to_us_delta(arrv_tv[1], arrv_tv[last]);
122 bw_msr = ((28+max_pkt_sz) << 3) * (last-1) / delta;
123 /* tell sender that it was agood train.*/
124 ctr_code = GOOD_TRAIN | CTR_CODE ;
125 send_ctr_mesg(ctr_buff, ctr_code ) ;
126 }
127 else
128 {
129 retry++ ;
130 /* wait for atleast 10msec before requesting another train */
131 last=train_len;
132 while(!arrv_tv[last].tv_sec) --last;
133 first=1 ;
134 while(!arrv_tv[first].tv_sec) ++first ;
135 delta = time_to_us_delta(arrv_tv[first], arrv_tv[last]);
136 bad_bw_msr[num_bad_train++] = ((28+max_pkt_sz) << 3) * (last-first-1) / delta;
137 select_tv.tv_sec=0;select_tv.tv_usec=10000;
138 select(0,NULL,NULL,NULL,&select_tv);
139 ctr_code = BAD_TRAIN | CTR_CODE ;
140 send_ctr_mesg(ctr_buff, ctr_code ) ;
141 exp_train_id++ ;
142 }
143 }
144
145 if (Verbose)
146 {
147 i = spacecnt;
148 putchar(']');
149 while(--i>0)putchar(' ');
150 printf(":: ");
151 }
152 fputc(']',pathload_fp);
153 while(--spacecnt>0)fputc(' ',pathload_fp);
154 fprintf(pathload_fp,":: ");
155 if ( !bad_train)
156 {
157 if(Verbose)
158 printf("%.2fMbps\n", bw_msr ) ;
159 fprintf(pathload_fp,"%.2fMbps\n", bw_msr ) ;
160 }
161 else
162 {
163 for ( i=0;i<num_bad_train;i++)
164 if ( finite(bad_bw_msr[i]))
165 sum += bad_bw_msr[i] ;
166 bw_msr = sum/num_bad_train ;
167 if(Verbose)
168 printf("%.2fMbps (I)\n", bw_msr ) ;
169 fprintf(pathload_fp,"%.2fMbps (I)\n", bw_msr ) ;
170 }
171 return bw_msr ;
172 }
173
174 /* Receive a complete packet train from the sender */
recv_train(l_int32 exp_train_id,struct timeval * time,l_int32 train_len)175 l_int32 recv_train( l_int32 exp_train_id, struct timeval *time,l_int32 train_len)
176 {
177 struct sigaction sigstruct ;
178 struct timeval current_time;
179 struct timeval select_tv;
180 fd_set readset;
181 l_int32 ret_val ;
182 l_int32 pack_id , exp_pack_id ;
183 l_int32 bad_train = 0 ;
184 l_int32 train_id ;
185 l_int32 rcvd=0;
186 char *pack_buf ;
187 char ctr_buff[8];
188 #ifdef THRLIB
189 thr_arg arg ;
190 pthread_t tid;
191 pthread_attr_t attr ;
192 #endif
193 exp_pack_id=0;
194
195 if ( ( pack_buf = malloc(max_pkt_sz*sizeof(char))) == NULL )
196 {
197 printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz);
198 exit(-1);
199 }
200
201 sigstruct.sa_handler = sig_sigusr1 ;
202 sigemptyset(&sigstruct.sa_mask);
203 sigstruct.sa_flags = 0 ;
204 #ifdef SA_INTERRUPT
205 sigstruct.sa_flags |= SA_INTERRUPT ;
206 #endif
207 sigaction(SIGUSR1 , &sigstruct,NULL );
208
209 #ifdef THRLIB
210 arg.finished_stream=0;
211 arg.ptid = pthread_self() ;
212 pthread_attr_init(&attr);
213 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
214 if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 )
215 {
216 perror("recv_train::pthread_create");
217 fprintf(stdout,"Failed to create thread. exiting...\n");
218 fprintf(pathload_fp,"Failed to create thread. exiting...\n");
219 exit(-1);
220 }
221 #endif
222
223 do
224 {
225 #ifndef THRLIB
226 FD_ZERO(&readset);
227 FD_SET(sock_tcp,&readset);
228 FD_SET(sock_udp,&readset);
229 select_tv.tv_sec=1000;select_tv.tv_usec=0;
230 if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 )
231 {
232 if (FD_ISSET(sock_udp,&readset) )
233 {
234 #endif
235 if (recvfrom(sock_udp, pack_buf, max_pkt_sz, 0, NULL, NULL) != -1)
236 {
237 gettimeofday(¤t_time, NULL);
238 memcpy(&train_id, pack_buf, sizeof(l_int32));
239 train_id = ntohl(train_id) ;
240 memcpy(&pack_id, pack_buf+sizeof(l_int32), sizeof(l_int32));
241 pack_id=ntohl(pack_id);
242 if (train_id == exp_train_id && pack_id==exp_pack_id )
243 {
244 rcvd++;
245 time[pack_id] = current_time ;
246 exp_pack_id++;
247 }
248 else bad_train=1;
249 }
250 #ifndef THRLIB
251 } // end of FD_ISSET
252
253 if ( FD_ISSET(sock_tcp,&readset) )
254 {
255 /* check the control connection.*/
256 if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
257 {
258 if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
259 ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) )
260 {
261 break;
262 }
263 }
264 }
265 } // end of select
266 } while (1);
267 #else
268 } while (!arg.finished_stream);
269 #endif
270
271 if ( rcvd != train_len+1 ) bad_train=1;
272 gettimeofday(&time[pack_id+1], NULL);
273 sigstruct.sa_handler = SIG_DFL ;
274 sigemptyset(&sigstruct.sa_mask);
275 sigstruct.sa_flags = 0 ;
276 sigaction(SIGUSR1 , &sigstruct,NULL );
277 free(pack_buf);
278 return bad_train ;
279 }
280
check_intr_coalescence(struct timeval time[],l_int32 len,l_int32 * burst)281 l_int32 check_intr_coalescence(struct timeval time[],l_int32 len, l_int32 *burst)
282 {
283 double delta[MAX_STREAM_LEN];
284 l_int32 b2b=0,tmp=0;
285 l_int32 i;
286 l_int32 min_gap;
287
288 min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL : 3*rcv_latency ;
289 //printf("---%d\n",len);
290 for (i=2;i<len;i++)
291 {
292 delta[i] = time_to_us_delta(time[i-1],time[i]);
293 if ( delta[i] <= min_gap )
294 {
295 b2b++ ;
296 tmp++;
297 }
298 else
299 {
300 if ( tmp >=3 )
301 {
302 (*burst)++;
303 tmp=0;
304 }
305 }
306 }
307
308 //fprintf(stderr,"\tNumber of b2b %d, Number of burst %d\n",b2b,*burst);
309 if ( b2b > .6*len )
310 {
311 return 1;
312 }
313 else return 0;
314 }
315
316 /*
317 Receive N streams .
318 After each stream, compute the loss rate.
319 Mark a stream "lossy" , if losss rate in
320 that stream is more than a threshold.
321 */
recv_fleet()322 l_int32 recv_fleet()
323 {
324 struct sigaction sigstruct ;
325 struct timeval snd_tv[MAX_STREAM_LEN], arrv_tv[MAX_STREAM_LEN];
326 struct timeval current_time, first_time;
327 double pkt_loss_rate ;
328 double owd[MAX_STREAM_LEN] ;
329 double snd_tm[MAX_STREAM_LEN] ;
330 double arrv_tm[MAX_STREAM_LEN];
331 l_int32 ctr_code ;
332 l_int32 pkt_lost = 0 ;
333 l_int32 stream_id_n , stream_id=0 ;
334 l_int32 total_pkt_rcvd=0 ,pkt_rcvd = 0 ;
335 l_int32 pkt_id = 0 ;
336 l_int32 pkt_id_n = 0 ;
337 l_int32 exp_pkt_id = 0 ;
338 l_int32 stream_cnt = 0 ; /* 0->n*/
339 l_int32 fleet_id , fleet_id_n = 0 ;
340 l_int32 lossy_stream = 0 ;
341 l_int32 return_val = 0 ;
342 l_int32 finished_stream = 0 ;
343 l_int32 stream_duration ;
344 l_int32 num_sndr_cs[20],num_rcvr_cs[20];
345 char ctr_buff[8];
346 char *pkt_buf ;
347 double owdfortd[MAX_STREAM_LEN];
348 l_int32 num_substream,substream[MAX_STREAM_LEN];
349 l_int32 low,high,len,j;
350 l_int32 b2b_pkt_per_stream[20];
351 l_int32 tmp_b2b;
352 #ifdef THRLIB
353 pthread_t tid ;
354 thr_arg arg ;
355 pthread_attr_t attr ;
356 #endif
357 l_int32 num_bursts;
358 l_int32 abort_fleet=0;
359 l_int32 p=0;
360 struct timeval select_tv;
361 fd_set readset;
362 l_int32 ret_val ;
363
364 if ( (pkt_buf = malloc(cur_pkt_sz*sizeof(char)) ) == NULL )
365 {
366 printf("ERROR : unable to malloc %ld bytes \n",cur_pkt_sz);
367 exit(-1);
368 }
369 trend_idx=0;
370 ic_flag = 0;
371 if(verbose&&!Verbose)
372 printf("Receiving Fleet %ld, Rate %.2fMbps\n",exp_fleet_id,tr);
373 if(Verbose)
374 {
375 printf("\nReceiving Fleet %ld\n",exp_fleet_id);
376 printf(" Fleet Parameter(req) :: R=%.2fMbps, L=%ldB, K=%ldpackets, \
377 T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval) ;
378 }
379 fprintf(pathload_fp,"\nReceiving Fleet %ld\n",exp_fleet_id);
380 fprintf(pathload_fp," Fleet Parameter(req) :: R=%.2fMbps, L=%ldB, \
381 K=%ldpackets, T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval);
382
383 if(Verbose)
384 printf(" Lossrate per stream :: ");
385 fprintf(pathload_fp," Lossrate per stream :: ");
386
387 sigstruct.sa_handler = sig_sigusr1 ;
388 sigemptyset(&sigstruct.sa_mask);
389 sigstruct.sa_flags = 0 ;
390 #ifdef SA_INTERRUPT
391 sigstruct.sa_flags |= SA_INTERRUPT ;
392 #endif
393 sigaction(SIGUSR1 , &sigstruct,NULL );
394
395 while ( stream_cnt < num_stream )
396 {
397 #ifdef THRLIB
398 arg.finished_stream=0;
399 arg.ptid = pthread_self() ;
400 arg.stream_cnt = stream_cnt ;
401 pthread_attr_init(&attr);
402 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
403 if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 )
404 {
405 perror("recv_fleet::pthread_create");
406 exit(-1);
407 }
408 #endif
409 pkt_lost = 0 ;
410 first_time.tv_sec = 0 ;
411 for (j=0; j < stream_len; j++ )
412 {
413 snd_tv[j].tv_sec=0 ; snd_tv[j].tv_usec=0 ;
414 arrv_tv[j].tv_sec=0; arrv_tv[j].tv_usec=0;
415 }
416
417 /* Receive K packets of ith stream */
418 #ifdef THRLIB
419 while(!arg.finished_stream)
420 #else
421 while(1)
422 #endif
423 {
424 #ifndef THRLIB
425 FD_ZERO(&readset);
426 FD_SET(sock_tcp,&readset);
427 FD_SET(sock_udp,&readset);
428 select_tv.tv_sec=1000;select_tv.tv_usec=0;
429 if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 )
430 {
431 if (FD_ISSET(sock_udp,&readset) )
432 {
433 #endif
434 if( recvfrom(sock_udp,pkt_buf,cur_pkt_sz,0,NULL,NULL) > 0 )
435 {
436 gettimeofday(¤t_time,NULL);
437 memcpy(&fleet_id_n,pkt_buf , sizeof(l_int32));
438 fleet_id = ntohl(fleet_id_n) ;
439 memcpy(&stream_id_n,pkt_buf+sizeof(l_int32) , sizeof(l_int32));
440 stream_id = ntohl(stream_id_n) ;
441 memcpy(&pkt_id_n, pkt_buf+2*sizeof(l_int32), sizeof(l_int32));
442 pkt_id = ntohl(pkt_id_n) ;
443 if ( fleet_id == exp_fleet_id && stream_id == stream_cnt &&
444 pkt_id >= exp_pkt_id )
445 {
446 if ( first_time.tv_sec == 0 )
447 first_time = current_time;
448 arrv_tv[pkt_id] = current_time ;
449 memcpy(&(snd_tv[pkt_id].tv_sec) , pkt_buf+3*sizeof(l_int32), sizeof(l_int32));
450 memcpy(&(snd_tv[pkt_id].tv_usec), pkt_buf+4*sizeof(l_int32), sizeof(l_int32));
451 if ( pkt_id > exp_pkt_id ) /* reordered are considered as lost */
452 {
453 pkt_lost += ( pkt_id - exp_pkt_id ) ;
454 exp_pkt_id = pkt_id ;
455 }
456 ++exp_pkt_id ;
457 ++pkt_rcvd;
458 }
459 } // end of recvfrom
460 #ifndef THRLIB
461 } // end of FD_ISSET
462
463 if ( FD_ISSET(sock_tcp,&readset) )
464 {
465 /* check the control connection.*/
466 if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
467 {
468 if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
469 ((ret_val & 0x7fffffff) == FINISHED_STREAM ) )
470 {
471 while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ;
472 if ( ret_val == stream_cnt )
473 {
474 break ;
475 }
476 }
477 }
478 }
479 } // end of select
480 else
481 {
482 perror("select");
483 exit(0);
484 }
485 #endif
486
487 }
488
489 for (j=0; j < stream_len; j++ )
490 {
491 snd_tv[j].tv_sec = ntohl(snd_tv[j].tv_sec);
492 snd_tv[j].tv_usec = ntohl(snd_tv[j].tv_usec);
493 snd_tm[j]= snd_tv[j].tv_sec * 1000000.0 + snd_tv[j].tv_usec ;
494 arrv_tm[j] = arrv_tv[j].tv_sec * 1000000.0 + arrv_tv[j].tv_usec ;
495 owd[j] = arrv_tm[j] - snd_tm[j] ;
496 }
497
498 total_pkt_rcvd += pkt_rcvd ;
499 finished_stream = 0 ;
500 pkt_lost += stream_len - exp_pkt_id ;
501 pkt_loss_rate = (double )pkt_lost * 100. / stream_len ;
502 if(Verbose)
503 printf(":%.1f",pkt_loss_rate ) ;
504 fprintf(pathload_fp,":%.1f",pkt_loss_rate ) ;
505 exp_pkt_id = 0 ;
506 stream_cnt++ ;
507
508 num_bursts=0;
509 if ( interrupt_coalescence )
510 ic_flag=check_intr_coalescence(arrv_tv,pkt_rcvd,&num_bursts);
511
512 if ( pkt_loss_rate < HIGH_LOSS_RATE && pkt_loss_rate >= MEDIUM_LOSS_RATE )
513 lossy_stream++ ;
514
515 if ( pkt_loss_rate >= HIGH_LOSS_RATE || ( stream_cnt >= num_stream
516 && lossy_stream*100./stream_cnt >= MAX_LOSSY_STREAM_FRACTION ))
517 {
518 if ( increase_stream_len )
519 {
520 increase_stream_len=0;
521 lower_bound=1;
522 }
523
524 if(Verbose)
525 printf("\n Fleet aborted due to high lossrate");
526 fprintf(pathload_fp,"\n Fleet aborted due to high lossrate");
527 abort_fleet=1;
528 break ;
529 }
530 else
531 {
532 /* analyze trend in stream */
533 num += get_sndr_time_interval(snd_tm,&snd_time_interval) ;
534 adjust_offset_to_zero(owd, stream_len);
535 num_substream = eliminate_sndr_side_CS(snd_tm,substream);
536 num_sndr_cs[stream_cnt-1] = num_substream ;
537 substream[num_substream++]=stream_len-1;
538 low=0;
539 num_rcvr_cs[stream_cnt-1]=0;
540 tmp_b2b=0;
541 for (j=0;j<num_substream;j++)
542 {
543 high=substream[j];
544 if ( ic_flag )
545 {
546 if ( num_bursts < 2 )
547 {
548 if ( ++repeat_1 == 3)
549 {
550 repeat_1=0;
551 /* Abort fleet and try to find lower bound */
552 abort_fleet=1;
553 lower_bound=1;
554 increase_stream_len=0;
555 break ;
556 }
557 }
558 else if ( num_bursts <= 5 )
559 {
560 if ( ++repeat_2 == 3)
561 {
562 repeat_2=0;
563 /* Abort fleet and retry with longer stream length */
564 abort_fleet=1;
565 increase_stream_len=1;
566 break ;
567 }
568 }
569 else
570 {
571 increase_stream_len=0;
572 len=eliminate_b2b_pkt_ic(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b);
573 /*
574 for(p=0;p<len;p++)
575 printf("%d %f\n",p,owdfortd[p]);
576 */
577 pct_metric[trend_idx]=
578 pairwise_comparision_test(owdfortd , 0 , len );
579 pdt_metric[trend_idx]=
580 pairwise_diff_test(owdfortd , 0, len );
581 trend_idx+=1;
582 }
583 }
584 else
585 {
586 len=eliminate_rcvr_side_CS(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b);
587 if ( len > MIN_STREAM_LEN )
588 {
589 get_trend(owdfortd,len);
590 }
591 }
592 low=high+1;
593 }
594 if ( abort_fleet )
595 break;
596 else
597 {
598 b2b_pkt_per_stream[stream_cnt-1] = tmp_b2b ;
599 ctr_code = CONTINUE_STREAM | CTR_CODE;
600 send_ctr_mesg(ctr_buff, ctr_code);
601 }
602 }
603 pkt_rcvd = 0 ;
604
605 /* A hack for slow links */
606 stream_duration = stream_len * time_interval ;
607 if ( stream_duration >= 500000 )
608 {
609 slow=1;
610 break ;
611 }
612 } /*end of while (stream_cnt < num_stream ). */
613
614 if ( Verbose ) printf("\n");
615 fprintf(pathload_fp,"\n");
616
617 if ( abort_fleet )
618 {
619 printf("\tAborting fleet. Stream_cnt %d\n",stream_cnt);
620 ctr_code = ABORT_FLEET | CTR_CODE;
621 send_ctr_mesg(ctr_buff , ctr_code ) ;
622 return_val = -1 ;
623 }
624 else
625 print_contextswitch_info(num_sndr_cs,num_rcvr_cs,b2b_pkt_per_stream,stream_cnt);
626
627 exp_fleet_id++ ;
628 free(pkt_buf);
629 return return_val ;
630 }
631
print_contextswitch_info(l_int32 num_sndr_cs[],l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt)632 void print_contextswitch_info(l_int32 num_sndr_cs[], l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt)
633 {
634 l_int32 j;
635
636 if (Verbose)
637 printf(" # of CS @ sndr :: ");
638 fprintf(pathload_fp," # of CS @ sndr :: ");
639
640 for(j=0;j<stream_cnt-1;j++)
641 {
642 if (Verbose) printf(":%2d",num_sndr_cs[j]);
643 fprintf(pathload_fp,":%2d",num_sndr_cs[j]);
644 }
645 if ( Verbose ) printf("\n");
646 fprintf(pathload_fp,"\n");
647 if (Verbose)
648 printf(" # of CS @ rcvr :: ");
649 fprintf(pathload_fp," # of CS @ rcvr :: ");
650 for(j=0;j<stream_cnt-1;j++)
651 {
652 if (Verbose) printf(":%2d",num_rcvr_cs[j]);
653 fprintf(pathload_fp,":%2d",num_rcvr_cs[j]);
654 }
655 if ( Verbose ) printf("\n");
656 fprintf(pathload_fp,"\n");
657
658 if (Verbose)
659 printf(" # of DS @ rcvr :: ");
660 fprintf(pathload_fp," # of DS @ rcvr :: ");
661 for(j=0;j<stream_cnt-1;j++)
662 {
663 if (Verbose) printf(":%2d",discard[j]);
664 fprintf(pathload_fp,":%2d",discard[j]);
665 }
666 if ( Verbose ) printf("\n");
667 fprintf(pathload_fp,"\n");
668 }
669
sig_sigusr1()670 void sig_sigusr1()
671 {
672 return;
673 }
674
sig_alrm()675 void sig_alrm()
676 {
677 terminate_gracefully(exp_start_time);
678 exit(0);
679 }
680
ctrl_listen(void * arg)681 void *ctrl_listen(void *arg)
682 {
683 struct timeval select_tv;
684 fd_set readset;
685 l_int32 ret_val ;
686 char ctr_buff[8];
687
688 #ifdef THRLIB
689 FD_ZERO(&readset);
690 FD_SET(sock_tcp,&readset);
691 select_tv.tv_sec=100;select_tv.tv_usec=0;
692 if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 )
693 {
694 /* check ... mesg received */
695 if ( FD_ISSET(sock_tcp,&readset) )
696 {
697 /* check the control connection.*/
698 if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
699 {
700 if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
701 ((ret_val & 0x7fffffff) == FINISHED_STREAM ) )
702 {
703 while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ;
704 if ( ret_val == ((thr_arg *)arg)->stream_cnt )
705 {
706 ((thr_arg *)arg)->finished_stream =1 ;
707 pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1);
708 pthread_exit(NULL);
709 }
710 }
711 else if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
712 ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) )
713 {
714 select_tv.tv_usec = 2000 ;
715 select_tv.tv_sec = 0 ;
716 select(1,NULL,NULL,NULL,&select_tv);
717 ((thr_arg *)arg)->finished_stream =1 ;
718 pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1);
719 pthread_exit(NULL);
720 }
721 }
722 }
723 }
724 #endif
725 return NULL;
726 }
727
get_trend(double owdfortd[],l_int32 pkt_cnt)728 void get_trend(double owdfortd[],l_int32 pkt_cnt )
729 {
730 double median_owd[MAX_STREAM_LEN];
731 l_int32 median_owd_len=0;
732 double ordered[MAX_STREAM_LEN];
733 l_int32 j,count,pkt_per_min;
734 //pkt_per_min = 5 ;
735 pkt_per_min = (int)floor(sqrt((double)pkt_cnt));
736 count = 0 ;
737 for ( j = 0 ; j < pkt_cnt ; j=j+pkt_per_min )
738 {
739 if ( j+pkt_per_min >= pkt_cnt )
740 count = pkt_cnt - j ;
741 else
742 count = pkt_per_min;
743 order_dbl(owdfortd , ordered ,j,count ) ;
744 if ( count % 2 == 0 )
745 median_owd[median_owd_len++] =
746 ( ordered[(int)(count*.5) -1] + ordered[(int)(count*0.5)] )/2 ;
747 else
748 median_owd[median_owd_len++] = ordered[(int)(count*0.5)] ;
749 }
750 pct_metric[trend_idx]=
751 pairwise_comparision_test(median_owd , 0 , median_owd_len );
752 pdt_metric[trend_idx]=
753 pairwise_diff_test(median_owd , 0, median_owd_len );
754 trend_idx+=1;
755 }
756
757 /*
758 Order an array of doubles using bubblesort
759 */
order_dbl(double unord_arr[],double ord_arr[],l_int32 start,l_int32 num_elems)760 void order_dbl(double unord_arr[], double ord_arr[],l_int32 start, l_int32 num_elems)
761 {
762 l_int32 i,j,k;
763 double temp;
764 for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
765
766 for (i=1;i<num_elems;i++)
767 {
768 for (j=i-1;j>=0;j--)
769 if (ord_arr[j+1] < ord_arr[j])
770 {
771 temp=ord_arr[j];
772 ord_arr[j]=ord_arr[j+1];
773 ord_arr[j+1]=temp;
774 }
775 else break;
776 }
777 }
778
779 /*
780 Order an array of float using bubblesort
781 */
order_float(float unord_arr[],float ord_arr[],l_int32 start,l_int32 num_elems)782 void order_float(float unord_arr[], float ord_arr[],l_int32 start, l_int32 num_elems)
783 {
784 l_int32 i,j,k;
785 double temp;
786 for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
787 for (i=1;i<num_elems;i++)
788 {
789 for (j=i-1;j>=0;j--)
790 if (ord_arr[j+1] < ord_arr[j])
791 {
792 temp=ord_arr[j];
793 ord_arr[j]=ord_arr[j+1];
794 ord_arr[j+1]=temp;
795 }
796 else break;
797 }
798 }
799
800 /*
801 Order an array of l_int32 using bubblesort
802 */
order_int(l_int32 unord_arr[],l_int32 ord_arr[],l_int32 num_elems)803 void order_int(l_int32 unord_arr[], l_int32 ord_arr[], l_int32 num_elems)
804 {
805 l_int32 i,j;
806 l_int32 temp;
807 for (i=0;i<num_elems;i++) ord_arr[i]=unord_arr[i];
808 for (i=1;i<num_elems;i++) {
809 for (j=i-1;j>=0;j--)
810 if (ord_arr[j+1] < ord_arr[j]) {
811 temp=ord_arr[j];
812 ord_arr[j]=ord_arr[j+1];
813 ord_arr[j+1]=temp;
814 }
815 else break;
816 }
817 }
818
819 /*
820 Send a message through the control stream
821 */
send_ctr_mesg(char * ctr_buff,l_int32 ctr_code)822 void send_ctr_mesg(char *ctr_buff, l_int32 ctr_code)
823 {
824 l_int32 ctr_code_n = htonl(ctr_code);
825 memcpy((void*)ctr_buff, &ctr_code_n, sizeof(l_int32));
826 if (write(sock_tcp, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
827 {
828 fprintf(stderr, "send control message failed:\n");
829 exit(-1);
830 }
831 }
832
833
834 /*
835 Receive message from the control stream
836 */
recv_ctr_mesg(l_int32 ctr_strm,char * ctr_buff)837 l_int32 recv_ctr_mesg(l_int32 ctr_strm, char *ctr_buff)
838 {
839 l_int32 ctr_code;
840 gettimeofday(&first_time,0);
841 if (read(ctr_strm, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
842 return(-1);
843 gettimeofday(&second_time,0);
844 memcpy(&ctr_code, ctr_buff, sizeof(l_int32));
845 return(ntohl(ctr_code));
846 }
847
848 /*
849 Compute the time difference in microseconds
850 between two timeval measurements
851 */
time_to_us_delta(struct timeval tv1,struct timeval tv2)852 double time_to_us_delta(struct timeval tv1, struct timeval tv2)
853 {
854 double time_us;
855 time_us = (double)
856 ((tv2.tv_sec-tv1.tv_sec)*1000000+(tv2.tv_usec-tv1.tv_usec));
857 return time_us;
858 }
859
860 /*
861 Compute the average of the set of measurements <data>.
862 */
get_avg(double data[],l_int32 num_values)863 double get_avg(double data[], l_int32 num_values)
864 {
865 l_int32 i;
866 double sum_;
867 sum_ = 0;
868 for (i=0; i<num_values; i++) sum_ += data[i];
869 return (sum_ / (double)num_values);
870 }
871
872 /*
873 PCT test to detect increasing trend in stream
874 */
pairwise_comparision_test(double array[],l_int32 start,l_int32 end)875 double pairwise_comparision_test (double array[] ,l_int32 start , l_int32 end)
876 {
877 l_int32 improvement = 0 ,i ;
878 double total ;
879
880 if ( ( end - start ) >= MIN_PARTITIONED_STREAM_LEN )
881 {
882 for ( i = start ; i < end - 1 ; i++ )
883 {
884 if ( array[i] < array[i+1] )
885 improvement += 1 ;
886 }
887 total = ( end - start ) ;
888 return ( (double)improvement/total ) ;
889 }
890 else
891 return -1 ;
892 }
893
894 /*
895 PDT test to detect increasing trend in stream
896 */
pairwise_diff_test(double array[],l_int32 start,l_int32 end)897 double pairwise_diff_test(double array[] ,l_int32 start , l_int32 end)
898 {
899 double y = 0 , y_abs = 0 ;
900 l_int32 i ;
901 if ( ( end - start ) >= MIN_PARTITIONED_STREAM_LEN )
902 {
903 for ( i = start+1 ; i < end ; i++ )
904 {
905 y += array[i] - array[i-1] ;
906 y_abs += fabs(array[i] - array[i-1]) ;
907 }
908 return y/y_abs ;
909 }
910 else
911 return 2. ;
912 }
913
grey_bw_resolution()914 double grey_bw_resolution()
915 {
916 if ( adr )
917 return (.05*adr<12?.05*adr:12) ;
918 else
919 return min_rate ;
920 }
921
922 /*
923 test if Rmax and Rmin range is smaller than
924 user specified bw resolution
925 or
926 if Gmin and Gmax range is smaller than grey
927 bw resolution.
928 */
converged()929 l_int32 converged()
930 {
931 int ret_val=0;
932 if ( (converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm )
933 ret_val=1;
934 else if ( tr_max != 0 && tr_max != tr_min )
935 {
936 if ( tr_max - tr_min <= bw_resol )
937 {
938 converged_rmn_rmx=1;
939 ret_val=1;
940 }
941 else if( tr_max - grey_max <= grey_bw_resolution() &&
942 grey_min - tr_min <= grey_bw_resolution() )
943 {
944 converged_gmn_rmn = 1;
945 converged_gmx_rmx = 1;
946 ret_val=1;
947 }
948 }
949 return ret_val ;
950 }
951
952 /*
953 Calculate next fleet rate
954 when fleet showed INCREASING trend.
955 */
radj_increasing()956 void radj_increasing()
957 {
958 if ( grey_max != 0 && grey_max >= tr_min )
959 {
960 if ( tr_max - grey_max <= grey_bw_resolution() )
961 {
962 converged_gmx_rmx = 1;
963 exp_flag=0;
964 if ( grey_min || tr_min )
965 radj_notrend() ;
966 else
967 {
968 if ( grey_min < grey_max )
969 tr = grey_min/2. ;
970 else
971 tr = grey_max/2. ;
972 }
973 }
974 else
975 tr = ( tr_max + grey_max)/2. ;
976 }
977 else
978 tr = (tr_max + tr_min)/2.<min_rate?min_rate:(tr_min+tr_max)/2. ;
979 }
980
981
982 /*
983 Calculate next fleet rate
984 when fleet showed NOTREND trend.
985 */
radj_notrend()986 void radj_notrend()
987 {
988 if ( exp_flag )
989 tr = 2*tr>max_rate?max_rate:2*tr ;
990 else
991 {
992 if ( grey_min != 0 && grey_min <= tr_max )
993 {
994 if ( grey_min - tr_min <= grey_bw_resolution() )
995 {
996 converged_gmn_rmn = 1;
997 radj_increasing() ;
998 }
999 else
1000 tr = (tr_min+grey_min)/2.<min_rate?min_rate:(tr_min+grey_min)/2. ;
1001 }
1002 else
1003 tr = (tr_max + tr_min)/2.<min_rate?min_rate:(tr_min+tr_max)/2. ;
1004 }
1005 }
1006
1007
1008 /*
1009 Calculate next fleet rate
1010 when fleet showed GREY trend.
1011 */
radj_greymax()1012 void radj_greymax()
1013 {
1014 if ( tr_max == 0 )
1015 tr = (tr+.5*tr)<max_rate?(tr+.5*tr):max_rate ;
1016 else if ( tr_max - grey_max <= grey_bw_resolution() )
1017 {
1018 converged_gmx_rmx = 1;
1019 radj_greymin() ;
1020 }
1021 else
1022 tr = ( tr_max + grey_max)/2. ;
1023 }
1024
1025 /*
1026 Calculate next fleet rate
1027 when fleet showed GREY trend.
1028 */
radj_greymin()1029 void radj_greymin()
1030 {
1031 if ( grey_min - tr_min <= grey_bw_resolution() )
1032 {
1033 converged_gmn_rmn = 1;
1034 radj_greymax() ;
1035 }
1036 else
1037 tr = (tr_min+grey_min)/2.<min_rate?min_rate:(tr_min+grey_min)/2. ;
1038 }
1039
1040
1041 /*
1042 dpending upon trend in fleet :-
1043 - update the state variables.
1044 - decide the next fleet rate
1045 return -1 when converged
1046 */
rate_adjustment(l_int32 flag)1047 l_int32 rate_adjustment(l_int32 flag)
1048 {
1049 l_int32 ret_val = 0 ;
1050 if( flag == INCREASING )
1051 {
1052 if ( max_rate_flag)
1053 max_rate_flag=0;
1054 if ( grey_max >= tr )
1055 grey_max = grey_min = 0 ;
1056 tr_max = tr ;
1057 if (!converged_gmx_rmx_tm )
1058 {
1059 if ( !converged() )
1060 radj_increasing() ;
1061 else
1062 ret_val=-1 ; //return -1;
1063 }
1064 else
1065 {
1066 exp_flag = 0 ;
1067 if ( !converged() )
1068 radj_notrend() ;
1069 }
1070 }
1071 else if ( flag == NOTREND )
1072 {
1073 if ( grey_min < tr )
1074 grey_min = 0 ;
1075 if ( grey_max < tr )
1076 grey_max = grey_min = 0 ;
1077 if ( tr > tr_min )
1078 tr_min = tr ;
1079 if ( !converged_gmn_rmn_tm && !converged() )
1080 radj_notrend() ;
1081 else
1082 ret_val=-1 ; //return -1 ;
1083 }
1084 else if ( flag == GREY )
1085 {
1086 if ( grey_max == 0 && grey_min == 0 )
1087 grey_max = grey_min = tr ;
1088 if (tr==grey_max || tr>grey_max )
1089 {
1090 grey_max = tr ;
1091 if ( !converged_gmx_rmx_tm )
1092 {
1093 if ( !converged() )
1094 radj_greymax() ;
1095 else
1096 ret_val=-1 ; //return -1 ;
1097 }
1098 else
1099 {
1100 exp_flag = 0 ;
1101 if ( !converged() )
1102 radj_notrend() ;
1103 else
1104 ret_val=-1;
1105 }
1106 }
1107 else if ( tr < grey_min || grey_min == 0 )
1108 {
1109 grey_min = tr ;
1110 if ( !converged() )
1111 radj_greymin() ;
1112 else
1113 ret_val=-1 ; //return -1 ;
1114 }
1115 }
1116
1117 if (Verbose)
1118 {
1119 printf(" Rmin-Rmax :: %.2f-%.2fMbps\n",tr_min,tr_max);
1120 printf(" Gmin-Gmax :: %.2f-%.2fMbps\n",grey_min,grey_max);
1121 }
1122 fprintf(pathload_fp," Rmin-Rmax :: %.2f-%.2fMbps\n",tr_min,tr_max);
1123 fprintf(pathload_fp," Gmin-Gmax :: %.2f-%.2fMbps\n",grey_min,grey_max);
1124
1125 if ( ret_val == -1 )
1126 return -1 ;
1127 if ( tr >= max_rate )
1128 max_rate_flag++ ;
1129
1130 if ( max_rate_flag > 1 )
1131 return -1 ;
1132 if ( min_rate_flag > 1 )
1133 return -1 ;
1134 transmission_rate = (l_int32) rint(1000000 * tr ) ;
1135 return 0 ;
1136 }
1137
1138 /*
1139 calculates fleet param L,T .
1140 calc_param returns -1, if we have
1141 reached to upper/lower limits of the
1142 stream parameters like L,T .
1143 otherwise returns 0 .
1144 */
calc_param()1145 l_int32 calc_param()
1146 {
1147 double tmp_tr ;
1148 l_int32 tmp ;
1149 l_int32 tmp_time_interval;
1150 if (tr < 150 )
1151 {
1152 time_interval = 80>min_time_interval?80:min_time_interval ;
1153 cur_pkt_sz=rint(tr*time_interval/8.) - 28;
1154 if ( cur_pkt_sz < MIN_PKT_SZ )
1155 {
1156 cur_pkt_sz = MIN_PKT_SZ ;
1157 time_interval =rint ((cur_pkt_sz + 28)*8./tr) ;
1158 tr = ( cur_pkt_sz + 28 )*8. /time_interval ;
1159 }
1160 else if ( cur_pkt_sz > max_pkt_sz )
1161 {
1162 cur_pkt_sz = max_pkt_sz;
1163 time_interval = min_time_interval ;
1164 tmp_tr = ( cur_pkt_sz + 28 )*8. /time_interval ;
1165 if ( equal(tr,tmp_tr))
1166 tr = tmp_tr;
1167 else
1168 return -1 ;
1169 }
1170 }
1171 else if ( tr < 600 )
1172 {
1173 tmp_tr = tr ;
1174 tmp_time_interval = rint(( max_pkt_sz + 28 )* 8 / tr) ;
1175 if ( cur_pkt_sz == max_pkt_sz && tmp_time_interval == time_interval )
1176 return -1 ;
1177 time_interval = tmp_time_interval ;
1178 tmp=rint(tr*time_interval/8.)-28;
1179 cur_pkt_sz=tmp<max_pkt_sz?tmp:max_pkt_sz;
1180 tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
1181 if ((tr_min && (equal(tr,tr_min) || tr<tr_min))
1182 || (grey_max && tmp_tr>grey_max && (equal(tr,grey_max) || tr<grey_max)))
1183 {
1184 do
1185 {
1186 --time_interval;
1187 cur_pkt_sz=rint(tr*time_interval/8.)-28;
1188 }while (cur_pkt_sz > max_pkt_sz);
1189 tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
1190 }
1191 }
1192 else
1193 {
1194 cur_pkt_sz = max_pkt_sz ;
1195 time_interval = rint(( cur_pkt_sz + 28 )* 8 / tr) ;
1196 tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
1197 if ((tr_min && (equal(tr,tr_min) || tr<tr_min)) )
1198 {
1199 return -1 ;
1200 }
1201 if( equal(tr,tr_max) )
1202 {
1203 tr_max = tr ;
1204 if ( grey_max )
1205 {
1206 converged_gmx_rmx_tm=1;
1207 if ( !converged_gmn_rmn && !converged_gmn_rmn_tm )
1208 radj_notrend();
1209 else return -1 ;
1210 }
1211 else
1212 {
1213 converged_rmn_rmx=1;
1214 return -1 ;
1215 }
1216 }
1217 }
1218 return 0 ;
1219 }
1220
1221 /*
1222 splits stream iff sender sent packets more than
1223 time_interval+1000 usec apart.
1224 */
eliminate_sndr_side_CS(double sndr_time_stamp[],l_int32 split_owd[])1225 l_int32 eliminate_sndr_side_CS (double sndr_time_stamp[], l_int32 split_owd[])
1226 {
1227 l_int32 j = 0,k=0;
1228 l_int32 cs_threshold;
1229
1230 cs_threshold = 2*time_interval>time_interval+1000?2*time_interval:time_interval+1000;
1231 for ( k = 0 ; k < stream_len-1 ; k++ )
1232 {
1233 if ( sndr_time_stamp[k] == 0 || sndr_time_stamp[k+1] == 0 )
1234 continue;
1235 else if ((sndr_time_stamp[k+1]-sndr_time_stamp[k]) > cs_threshold)
1236 split_owd[j++] = k;
1237 }
1238 return j ;
1239 }
1240
1241 /*
1242 discards owd of packets received when
1243 receiver was NOT running.
1244 */
eliminate_rcvr_side_CS(double rcvr_time_stamp[],double owd[],double owdfortd[],l_int32 low,l_int32 high,l_int32 * num_rcvr_cs,l_int32 * tmp_b2b)1245 l_int32 eliminate_rcvr_side_CS ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b )
1246 {
1247 l_int32 b2b_pkt[MAX_STREAM_LEN] ;
1248 l_int32 i,k=0 ;
1249 l_int32 len=0;
1250 l_int32 min_gap;
1251
1252 min_gap = MIN_TIME_INTERVAL > 1.5*rcv_latency ? MIN_TIME_INTERVAL :2.5*rcv_latency ;
1253 for ( i = low ; i <= high ; i++ )
1254 {
1255 if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 )
1256 continue ;
1257 else if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])> min_gap)
1258 owdfortd[len++] = owd[i];
1259 else
1260 b2b_pkt[k++] = i ;
1261 }
1262
1263 /* go through discarded list and count b2b discards as 1 CS instance */
1264 for (i=1;i<k;i++)
1265 if ( b2b_pkt[i]-b2b_pkt[i-1] != 1)
1266 (*num_rcvr_cs)++;
1267 *tmp_b2b += k;
1268 return len ;
1269 }
1270
1271 /* eliminates packets received b2b due to IC */
eliminate_b2b_pkt_ic(double rcvr_time_stamp[],double owd[],double owdfortd[],l_int32 low,l_int32 high,l_int32 * num_rcvr_cs,l_int32 * tmp_b2b)1272 l_int32 eliminate_b2b_pkt_ic ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b )
1273 {
1274 l_int32 b2b_pkt[MAX_STREAM_LEN] ;
1275 l_int32 i,k=0 ;
1276 l_int32 len=0;
1277 l_int32 min_gap;
1278 l_int32 tmp=0;
1279
1280 min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL :3*rcv_latency ;
1281 for ( i = low ; i <= high ; i++ )
1282 {
1283 if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 )
1284 continue ;
1285
1286 //fprintf(stderr,"i %d owd %.2f dispersion %.2f",i, owd[i],rcvr_time_stamp[i+1]- rcvr_time_stamp[i]);
1287 if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])< min_gap)
1288 {
1289 b2b_pkt[k++] = i ;
1290 tmp++;
1291 //fprintf(stderr," b\n");
1292 }
1293 else
1294 {
1295 if ( tmp >= 3 )
1296 {
1297 //fprintf(stderr," j\n");
1298 tmp=0;
1299 owdfortd[len++] = owd[i];
1300 }
1301 }
1302 }
1303 return len ;
1304 }
1305
1306 /* Adjust offset to zero again */
adjust_offset_to_zero(double owd[],l_int32 len)1307 void adjust_offset_to_zero(double owd[], l_int32 len)
1308 {
1309 l_int32 owd_min = 0;
1310 l_int32 i ;
1311 for (i=0; i< len; i++) {
1312 if ( owd_min == 0 && owd[i] != 0 ) owd_min=owd[i];
1313 else if (owd_min != 0 && owd[i] != 0 && owd[i]<owd_min) owd_min=owd[i];
1314 }
1315
1316 for (i=0; i< len; i++) {
1317 if ( owd[i] != 0 )
1318 owd[i] -= owd_min;
1319 }
1320 }
1321
1322 #define INCR 1
1323 #define NOTR 2
1324 #define DISCARD 3
1325 #define UNCL 4
get_pct_trend(double pct_metric[],l_int32 pct_trend[],l_int32 pct_result_cnt)1326 void get_pct_trend(double pct_metric[], l_int32 pct_trend[], l_int32 pct_result_cnt )
1327 {
1328 l_int32 i ;
1329 for (i=0; i < pct_result_cnt;i++ )
1330 {
1331 pct_trend[i] = UNCL ;
1332 if ( pct_metric[i] == -1 )
1333 {
1334 if (Verbose)
1335 printf("d");
1336 fprintf(pathload_fp,"d");
1337 pct_trend[i] = DISCARD ;
1338 }
1339 else if ( pct_metric[i] > 1.1 * PCT_THRESHOLD )
1340 {
1341 if (Verbose)
1342 printf("I");
1343 fprintf(pathload_fp,"I");
1344 pct_trend[i] = INCR ;
1345 }
1346 else if ( pct_metric[i] < .9 * PCT_THRESHOLD )
1347 {
1348 if (Verbose)
1349 printf("N");
1350 fprintf(pathload_fp,"N");
1351 pct_trend[i] = NOTR ;
1352 }
1353 else if(pct_metric[i] <= PCT_THRESHOLD*1.1 && pct_metric[i] >= PCT_THRESHOLD*.9 )
1354 {
1355 if (Verbose)
1356 printf("U");
1357 fprintf(pathload_fp,"U");
1358 pct_trend[i] = UNCL ;
1359 }
1360 }
1361 if (Verbose)
1362 printf("\n");
1363 fprintf(pathload_fp,"\n");
1364 }
1365
get_pdt_trend(double pdt_metric[],l_int32 pdt_trend[],l_int32 pdt_result_cnt)1366 void get_pdt_trend(double pdt_metric[], l_int32 pdt_trend[], l_int32 pdt_result_cnt )
1367 {
1368 l_int32 i ;
1369 for (i=0; i < pdt_result_cnt;i++ )
1370 {
1371 if ( pdt_metric[i] == 2 )
1372 {
1373 if (Verbose)
1374 printf("d");
1375 fprintf(pathload_fp,"d");
1376 pdt_trend[i] = DISCARD ;
1377 }
1378 else if ( pdt_metric[i] > 1.1 * PDT_THRESHOLD )
1379 {
1380 if (Verbose)
1381 printf("I");
1382 fprintf(pathload_fp,"I");
1383 pdt_trend[i] = INCR ;
1384 }
1385 else if ( pdt_metric[i] < .9 * PDT_THRESHOLD )
1386 {
1387 if (Verbose)
1388 printf("N");
1389 fprintf(pathload_fp,"N");
1390 pdt_trend[i] = NOTR ;
1391 }
1392 else if ( pdt_metric[i] <= PDT_THRESHOLD*1.1 && pdt_metric[i] >= PDT_THRESHOLD*.9 )
1393 {
1394 if (Verbose)
1395 printf("U");
1396 fprintf(pathload_fp,"U");
1397 pdt_trend[i] = UNCL ;
1398 }
1399 }
1400 if (Verbose)
1401 printf("\n");
1402 fprintf(pathload_fp,"\n");
1403 }
1404
1405 /*
1406 returns : trend in fleet or -1 if more than 50% of stream were discarded
1407 */
aggregate_trend_result()1408 l_int32 aggregate_trend_result()
1409 {
1410 l_int32 total=0 ,i_cnt = 0, n_cnt = 0;
1411 l_int32 num_dscrd_strm=0;
1412 l_int32 i=0;
1413 l_int32 pct_trend[TREND_ARRAY_LEN] , pdt_trend[TREND_ARRAY_LEN] ;
1414
1415 if (Verbose)
1416 printf(" PCT metric/stream[%2d] :: ",trend_idx);
1417 fprintf(pathload_fp," PCT metric/stream[%2d] :: ",trend_idx);
1418 for (i=0; i < trend_idx;i++ )
1419 {
1420 if (Verbose)
1421 printf("%3.2f:",pct_metric[i]);
1422 fprintf(pathload_fp,"%3.2f:",pct_metric[i]);
1423 }
1424 if (Verbose)
1425 printf("\n");
1426 fprintf(pathload_fp,"\n");
1427 if (Verbose)
1428 printf(" PDT metric/stream[%2d] :: ",trend_idx);
1429 fprintf(pathload_fp," PDT metric/stream[%2d] :: ",trend_idx);
1430 for (i=0; i < trend_idx;i++ )
1431 {
1432 if (Verbose)
1433 printf("%3.2f:",pdt_metric[i]);
1434 fprintf(pathload_fp,"%3.2f:",pdt_metric[i]);
1435 }
1436 if (Verbose)
1437 printf("\n");
1438 fprintf(pathload_fp,"\n");
1439 if (Verbose)
1440 printf(" PCT Trend/stream [%2d] :: ",trend_idx);
1441 fprintf(pathload_fp," PCT Trend/stream [%2d] :: ",trend_idx);
1442 get_pct_trend(pct_metric,pct_trend,trend_idx);
1443 if (Verbose)
1444 printf(" PDT Trend/stream [%2d] :: ",trend_idx);
1445 fprintf(pathload_fp," PDT Trend/stream [%2d] :: ",trend_idx);
1446 get_pdt_trend(pdt_metric,pdt_trend,trend_idx);
1447
1448 if (Verbose)
1449 printf(" Trend per stream [%2d] :: ",trend_idx);
1450 fprintf(pathload_fp," Trend per stream [%2d] :: ",trend_idx);
1451 for (i=0; i < trend_idx;i++ )
1452 {
1453 if ( pct_trend[i] == DISCARD || pdt_trend[i] == DISCARD )
1454 {
1455 if (Verbose)
1456 printf("d");
1457 fprintf(pathload_fp,"d");
1458 num_dscrd_strm++ ;
1459 }
1460 else if ( pct_trend[i] == INCR && pdt_trend[i] == INCR )
1461 {
1462 if (Verbose)
1463 printf("I");
1464 fprintf(pathload_fp,"I");
1465 i_cnt++;
1466 }
1467 else if ( pct_trend[i] == NOTR && pdt_trend[i] == NOTR )
1468 {
1469 if (Verbose)
1470 printf("N");
1471 fprintf(pathload_fp,"N");
1472 n_cnt++;
1473 }
1474 else if ( pct_trend[i] == INCR && pdt_trend[i] == UNCL )
1475 {
1476 if (Verbose)
1477 printf("I");
1478 fprintf(pathload_fp,"I");
1479 i_cnt++;
1480 }
1481 else if ( pct_trend[i] == NOTR && pdt_trend[i] == UNCL )
1482 {
1483 if (Verbose)
1484 printf("N");
1485 fprintf(pathload_fp,"N");
1486 n_cnt++;
1487 }
1488 else if ( pdt_trend[i] == INCR && pct_trend[i] == UNCL )
1489 {
1490 if (Verbose)
1491 printf("I");
1492 fprintf(pathload_fp,"I");
1493 i_cnt++;
1494 }
1495 else if ( pdt_trend[i] == NOTR && pct_trend[i] == UNCL )
1496 {
1497 if (Verbose)
1498 printf("N");
1499 fprintf(pathload_fp,"N");
1500 n_cnt++ ;
1501 }
1502 else
1503 {
1504 if (Verbose)
1505 printf("U");
1506 fprintf(pathload_fp,"U");
1507 }
1508 total++ ;
1509 }
1510 if (Verbose) printf("\n");
1511 fprintf(pathload_fp,"\n");
1512
1513 /* check whether number of usable streams is
1514 atleast 50% of requested number of streams */
1515 total-=num_dscrd_strm ;
1516 if ( total < num_stream/2 && !slow && !interrupt_coalescence)
1517 {
1518 bad_fleet_cs = 1 ;
1519 retry_fleet_cnt_cs++ ;
1520 return -1 ;
1521 }
1522 else
1523 {
1524 bad_fleet_cs = 0 ;
1525 retry_fleet_cnt_cs=0;
1526 }
1527
1528 if( (double)i_cnt/(total) >= AGGREGATE_THRESHOLD )
1529 {
1530 if (Verbose)
1531 printf(" Aggregate trend :: INCREASING\n");
1532 fprintf(pathload_fp," Aggregate trend :: INCREASING\n");
1533 return INCREASING ;
1534 }
1535 else if( (double)n_cnt/(total) >= AGGREGATE_THRESHOLD )
1536 {
1537 if (Verbose)
1538 printf(" Aggregate trend :: NO TREND\n");
1539 fprintf(pathload_fp," Aggregate trend :: NO TREND\n");
1540 return NOTREND ;
1541 }
1542 else
1543 {
1544 if (Verbose)
1545 printf(" Aggregate trend :: GREY\n");
1546 fprintf(pathload_fp," Aggregate trend :: GREY\n");
1547 return GREY ;
1548 }
1549 }
1550
get_sndr_time_interval(double snd_time[],double * sum)1551 l_int32 get_sndr_time_interval(double snd_time[],double *sum)
1552 {
1553 l_int32 k,j=0,new_j=0;
1554 double ordered[MAX_STREAM_LEN] ;
1555 double ltime_interval[MAX_STREAM_LEN] ;
1556 for ( k = 0; k < stream_len-1; k++ )
1557 {
1558 if ( snd_time[k] == 0 || snd_time[k+1] == 0 )
1559 continue;
1560 else
1561 ltime_interval[j++] = snd_time[k+1] - snd_time[k] ;
1562 }
1563 order_dbl(ltime_interval, ordered , 0, j ) ;
1564 /* discard the top 15% as outliers */
1565 new_j = j - rint(j*.15) ;
1566 for ( k = 0 ; k < new_j ; k++ )
1567 *sum += ordered[k] ;
1568 return new_j ;
1569 }
1570
get_sending_rate()1571 void get_sending_rate()
1572 {
1573 time_interval = snd_time_interval/num;
1574 cur_req_rate = tr ;
1575 cur_actual_rate = (28 + cur_pkt_sz) * 8. / time_interval ;
1576
1577 if( !equal(cur_req_rate, cur_actual_rate) )
1578 {
1579 if( !grey_max && !grey_min )
1580 {
1581 if( tr_min && tr_max && (less_than(cur_actual_rate,tr_min)||equal(cur_actual_rate,tr_min)))
1582 converged_rmn_rmx_tm = 1;
1583 if( tr_min && tr_max && (less_than(tr_max,cur_actual_rate)||equal(tr_max,cur_actual_rate)))
1584 converged_rmn_rmx_tm = 1;
1585
1586 }
1587 else if ( cur_req_rate < tr_max && cur_req_rate > grey_max )
1588 {
1589 if( !(less_than(cur_actual_rate,tr_max)&&grtr_than(cur_actual_rate,grey_max)) )
1590 converged_gmx_rmx_tm = 1;
1591 }
1592 else if ( cur_req_rate < grey_min && cur_req_rate > tr_min )
1593 {
1594 if( !(less_than(cur_actual_rate,grey_min) && grtr_than(cur_actual_rate,tr_min)) )
1595 converged_gmn_rmn_tm = 1;
1596 }
1597 }
1598
1599 tr = cur_actual_rate ;
1600 transmission_rate = (l_int32) rint(1000000 * tr) ;
1601 if(Verbose)
1602 printf(" Fleet Parameter(act) :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate, cur_pkt_sz , stream_len,time_interval);
1603 fprintf(pathload_fp," Fleet Parameter(act) :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate,cur_pkt_sz,stream_len,time_interval);
1604 snd_time_interval=0;
1605 num=0;
1606 }
1607
terminate_gracefully(struct timeval exp_start_time)1608 void terminate_gracefully(struct timeval exp_start_time)
1609 {
1610 l_int32 ctr_code;
1611 char ctr_buff[8],buff[26];
1612 struct timeval exp_end_time;
1613 double min=0,max=0 ;
1614
1615 ctr_code = TERMINATE | CTR_CODE;
1616 send_ctr_mesg(ctr_buff, ctr_code);
1617
1618 gettimeofday(&exp_end_time, NULL);
1619 strncpy(buff, ctime(&(exp_end_time.tv_sec)), 24);
1620 buff[24] = '\0';
1621 if (verbose || Verbose)
1622 printf("\n\t***** RESULT *****\n");
1623 fprintf(pathload_fp,"\n\t***** RESULT *****\n");
1624
1625 if (netlog)
1626 netlogger() ;
1627
1628 if ( min_rate_flag )
1629 {
1630 if (verbose || Verbose)
1631 {
1632 printf("Avail-bw < minimum sending rate.\n");
1633 printf("Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n");
1634 }
1635 fprintf(pathload_fp,"Avail-bw < minimum sending rate.\n");
1636 fprintf(pathload_fp,"Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n");
1637 }
1638 else if ( max_rate_flag && !interrupt_coalescence )
1639 {
1640 if (verbose || Verbose)
1641 {
1642 printf("Avail-bw > maximum sending rate.\n");
1643 if ( tr_min)
1644 printf("Avail-bw > %.2f (Mbps)\n", tr_min);
1645 }
1646 fprintf(pathload_fp,"Avail-bw > maximum sending rate.\n");
1647 if ( tr_min)
1648 fprintf(pathload_fp,"Avail-bw > %.2f (Mbps)\n", tr_min);
1649 }
1650 else if (bad_fleet_cs && !interrupt_coalescence)
1651 {
1652 if (verbose || Verbose)
1653 printf("Measurement terminated due to frequent CS @ sender/receiver.\n");
1654 fprintf(pathload_fp,"Measurement terminated due to frequent CS @ sender/receiver.\n");
1655 if ((tr_min&& tr_max) || (grey_min&&grey_max))
1656 {
1657 if ( grey_min&& grey_max)
1658 {
1659 min = grey_min ; max = grey_max ;
1660 }
1661 else
1662 {
1663 min = tr_min ;max = tr_max ;
1664 }
1665 if (verbose || Verbose)
1666 {
1667 printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1668 printf("Measurements finished at %s \n", buff);
1669 printf("Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
1670 }
1671 fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1672 fprintf(pathload_fp,"Measurements finished at %s \n", buff);
1673 fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
1674 }
1675 }
1676 else
1677 {
1678 if ( !interrupt_coalescence && ((converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm ))
1679 {
1680 if (Verbose)
1681 printf("Actual probing rate != desired probing rate.\n");
1682 fprintf(pathload_fp,"Actual probing rate != desired probing rate.\n");
1683 if ( converged_rmn_rmx_tm )
1684 {
1685 min = tr_min ; max = tr_max;
1686 }
1687 else
1688 {
1689 min = grey_min ; max = grey_max ;
1690 }
1691 }
1692 else if ( !interrupt_coalescence && converged_rmn_rmx )
1693 {
1694 if (Verbose)
1695 printf("User specified bandwidth resolution achieved\n");
1696 fprintf(pathload_fp,"User specified bandwidth resolution achieved\n");
1697 min = tr_min ; max = tr_max ;
1698 }
1699 else if ( !interrupt_coalescence && converged_gmn_rmn && converged_gmx_rmx )
1700 {
1701 if (Verbose)
1702 printf("Exiting due to grey bw resolution\n");
1703 fprintf(pathload_fp,"Exiting due to grey bw resolution\n");
1704 min = grey_min ; max = grey_max;
1705 }
1706 else
1707 {
1708 min = tr_min ; max = tr_max;
1709 }
1710
1711 if (verbose||Verbose)
1712 {
1713 if ( lower_bound)
1714 {
1715 printf("Receiver NIC has interrupt coalescence enabled\n");
1716 printf("Available bandwidth is greater than %.2f (Mbps)\n", min);
1717 }
1718 else
1719 printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1720
1721 printf("Measurements finished at %s \n", buff);
1722 printf("Measurement latency is %.2f sec \n",time_to_us_delta(exp_start_time,exp_end_time)/1000000);
1723 }
1724
1725 if ( lower_bound)
1726 {
1727 fprintf(pathload_fp,"Receiver NIC has interrupt coalescence enabled\n");
1728 fprintf(pathload_fp,"Available bandwidth is greater than %.2f (Mbps)\n", min);
1729 }
1730 else
1731 fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
1732
1733 fprintf(pathload_fp,"Measurements finished at %s \n", buff);
1734 fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
1735 }
1736
1737 if (netlog)
1738 fclose(netlog_fp);
1739 fclose(pathload_fp);
1740 close(sock_tcp);
1741 exit(0);
1742 }
1743
netlogger()1744 void netlogger()
1745 {
1746 struct tm *tm;
1747 struct hostent *rcv_host, *snd_host;
1748 char rcv_name[256];
1749 struct timeval curr_time;
1750
1751 gettimeofday(&curr_time,NULL);
1752 tm = gmtime(&curr_time.tv_sec);
1753 fprintf(netlog_fp,"DATE=%4d",tm->tm_year+1900);
1754 print_time(netlog_fp,tm->tm_mon+1);
1755 print_time(netlog_fp,tm->tm_mday);
1756 print_time(netlog_fp,tm->tm_hour);
1757 print_time(netlog_fp,tm->tm_min);
1758 if (tm->tm_sec <10) {
1759 fprintf(netlog_fp,"0");
1760 fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0);
1761 }
1762 else{
1763 fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0);
1764 }
1765 gethostname(rcv_name, 255);
1766 rcv_host = gethostbyname(rcv_name);
1767 if(strcmp(rcv_name, "\0")!=0) fprintf(netlog_fp," HOST=%s",rcv_host->h_name);
1768 else fprintf(netlog_fp," HOST=NO_NAME");
1769 fprintf(netlog_fp," PROG=pathload");
1770 fprintf(netlog_fp," LVL=Usage");
1771 if ((snd_host = gethostbyname(hostname)) == 0) {
1772 snd_host = gethostbyaddr(hostname,256,AF_INET);
1773 }
1774 fprintf(netlog_fp," PATHLOAD.SNDR=%s",snd_host->h_name);
1775 fprintf(netlog_fp," PATHLOAD.ABWL=%.1fMbps",tr_min);
1776 fprintf(netlog_fp," PATHLOAD.ABWH=%.1fMbps\n",tr_max);
1777 fclose(netlog_fp);
1778 }
1779
1780 /* prl_int32 time */
print_time(FILE * fp,l_int32 time)1781 void print_time(FILE *fp, l_int32 time)
1782 {
1783 if( time<10){
1784 fprintf(fp,"0");
1785 fprintf(fp,"%1d",time);
1786 }
1787 else{
1788 fprintf(fp,"%2d",time);
1789 }
1790 }
1791
less_than(double a,double b)1792 l_int32 less_than(double a, double b)
1793 {
1794 if ( !equal(a,b) && a < b)
1795 return 1;
1796 else
1797 return 0;
1798 }
1799
grtr_than(double a,double b)1800 l_int32 grtr_than(double a, double b)
1801 {
1802 if ( !equal(a,b) && a > b)
1803 return 1;
1804 else
1805 return 0;
1806 }
1807
1808 /*
1809 if a approx-equal b, return 1
1810 else 0
1811 */
equal(double a,double b)1812 l_int32 equal(double a , double b)
1813 {
1814 l_int32 maxdiff ;
1815 if ( a<b?a:b < 500 ) maxdiff = 2.5 ;
1816 else maxdiff = 5 ;
1817 if ( abs( a - b ) / b <= .02 && abs(a-b) < maxdiff )
1818 return 1 ;
1819 else
1820 return 0;
1821 }
1822
1823
1824 /*
1825 * Help
1826 * */
help()1827 void help()
1828 {
1829 fprintf(stderr, "usage: pathload_rcv [-q|-v] [-o|-O <filename>] [-N <filename>]\
1830 [-w <bw_resol>] [-h|-H] -s <sender>\n");
1831 fprintf (stderr,"-s : hostname/ipaddress of sender\n");
1832 fprintf (stderr,"-q : quite mode\n");
1833 fprintf (stderr,"-v : verbose mode\n");
1834 fprintf (stderr,"-w : user specified bw resolution\n");
1835 fprintf (stderr,"-o <file> : write log in user specified file [default is pathload.log]\n");
1836 fprintf (stderr,"-O <file> : append log in user specified file [default is pathload.log]\n");
1837 fprintf (stderr,"-N <file> : print output in netlogger format to <file>\n");
1838 fprintf (stderr,"-h|H : print this help and exit\n");
1839 exit(0);
1840 }
1841