1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #ifdef HAVE_WINDOWS_H
7 #include <winsock2.h>
8 #include <windows.h>
9 #endif
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include "mpi.h"
13 #include "GetOpt.h"
14 #include <string.h>
15
16 #ifndef BOOL
17 typedef int BOOL;
18 #endif
19 #ifndef TRUE
20 #define TRUE 1
21 #endif
22 #ifndef FALSE
23 #define FALSE 0
24 #endif
25
26 #define TRIALS 3
27 #define REPEAT 1000
28 int g_NSAMP = 250;
29 #define PERT 3
30 /*#define LATENCYREPS 1000*/
31 int g_LATENCYREPS = 1000;
32 #define LONGTIME 1e99
33 #define CHARSIZE 8
34 #define PATIENCE 50
35 #define RUNTM 0.25
36 double g_STOPTM = 0.1;
37 #define MAXINT 2147483647
38
39 #define ABS(x) (((x) < 0)?(-(x)):(x))
40 #define MIN(x, y) (((x) < (y))?(x):(y))
41 #define MAX(x, y) (((x) > (y))?(x):(y))
42
43 int g_nIproc = 0, g_nNproc = 0;
44
45 typedef struct protocolstruct ProtocolStruct;
46 struct protocolstruct {
47 int nbor, iproc;
48 };
49
50 typedef struct argstruct ArgStruct;
51 struct argstruct {
52 /* This is the common information that is needed for all tests */
53 char *host; /* Name of receiving host */
54 char *buff; /* Transmitted buffer */
55 char *buff1; /* Transmitted buffer */
56 size_t bufflen; /* Length of transmitted buffer */
57 int tr, /* Transmit flag */
58 nbuff; /* Number of buffers to transmit */
59
60 /* Now we work with a union of information for protocol dependent stuff */
61 ProtocolStruct prot; /* Structure holding necessary info for TCP */
62 };
63
64 typedef struct data Data;
65 struct data {
66 double t;
67 double bps;
68 double variance;
69 int bits;
70 int repeat;
71 };
72
73 double When(void);
74 int Setup(ArgStruct * p);
75 void Sync(ArgStruct * p);
76 void SendData(ArgStruct * p);
77 void RecvData(ArgStruct * p);
78 void SendRecvData(ArgStruct * p);
79 void SendTime(ArgStruct * p, double *t, int *rpt);
80 void RecvTime(ArgStruct * p, double *t, int *rpt);
81 int Establish(ArgStruct * p);
82 int CleanUp(ArgStruct * p);
83 double TestLatency(ArgStruct * p);
84 double TestSyncTime(ArgStruct * p);
85 void PrintOptions(void);
86 int DetermineLatencyReps(ArgStruct * p);
87
PrintOptions()88 void PrintOptions()
89 {
90 printf("\n");
91 printf("Usage: netpipe flags\n");
92 printf(" flags:\n");
93 printf(" -reps #iterations\n");
94 printf(" -time stop_time\n");
95 printf(" -start initial_msg_size\n");
96 printf(" -end final_msg_size\n");
97 printf(" -out outputfile\n");
98 printf(" -nocache\n");
99 printf(" -headtohead\n");
100 printf(" -pert\n");
101 printf(" -noprint\n");
102 printf(" -onebuffer largest_buffer_size\n");
103 printf("Requires exactly two processes\n");
104 printf("\n");
105 }
106
main(int argc,char * argv[])107 int main(int argc, char *argv[])
108 {
109 FILE *out = 0; /* Output data file */
110 char s[255]; /* Generic string */
111 char *memtmp;
112 char *memtmp1;
113
114 int i, j, n, nq, /* Loop indices */
115 bufoffset = 0, /* Align buffer to this */
116 bufalign = 16 * 1024, /* Boundary to align buffer to */
117 nrepeat, /* Number of time to do the transmission */
118 nzero = 0, inc = 1, /* Increment value */
119 detailflag = 0, /* Set to examine the signature curve detail */
120 pert, /* Perturbation value */
121 ipert, /* index of the perturbation loop */
122 start = 0, /* Starting value for signature curve */
123 end = MAXINT, /* Ending value for signature curve */
124 streamopt = 0, /* Streaming mode flag */
125 printopt = 1; /* Debug print statements flag */
126 int one_buffer = 0;
127 int onebuffersize = 100 * 1024 * 1024;
128 int quit = 0;
129 size_t len; /* Number of bytes to be transmitted */
130
131 ArgStruct args; /* Argumentsfor all the calls */
132
133 double t, t0, t1, t2, /* Time variables */
134 tlast, /* Time for the last transmission */
135 tzero = 0, latency, /* Network message latency */
136 synctime; /* Network synchronization time */
137
138 Data *bwdata; /* Bandwidth curve data */
139
140 BOOL bNoCache = FALSE;
141 BOOL bHeadToHead = FALSE;
142 BOOL bSavePert = FALSE;
143 BOOL bUseMegaBytes = FALSE;
144
145 MPI_Init(&argc, &argv);
146
147 MPI_Comm_size(MPI_COMM_WORLD, &g_nNproc);
148 MPI_Comm_rank(MPI_COMM_WORLD, &g_nIproc);
149
150 if (g_nNproc != 2) {
151 if (g_nIproc == 0)
152 PrintOptions();
153 MPI_Finalize();
154 exit(0);
155 }
156
157 GetOptDouble(&argc, &argv, "-time", &g_STOPTM);
158 GetOptInt(&argc, &argv, "-reps", &g_NSAMP);
159 GetOptInt(&argc, &argv, "-start", &start);
160 GetOptInt(&argc, &argv, "-end", &end);
161 one_buffer = GetOptInt(&argc, &argv, "-onebuffer", &onebuffersize);
162 if (one_buffer) {
163 if (onebuffersize < 1) {
164 one_buffer = 0;
165 } else {
166 onebuffersize += bufalign;
167 }
168 }
169 bNoCache = GetOpt(&argc, &argv, "-nocache");
170 bHeadToHead = GetOpt(&argc, &argv, "-headtohead");
171 bUseMegaBytes = GetOpt(&argc, &argv, "-mb");
172 if (GetOpt(&argc, &argv, "-noprint"))
173 printopt = 0;
174 bSavePert = GetOpt(&argc, &argv, "-pert");
175
176 bwdata = malloc((g_NSAMP + 1) * sizeof(Data));
177
178 if (g_nIproc == 0)
179 strcpy(s, "Netpipe.out");
180 GetOptString(&argc, &argv, "-out", s);
181
182 if (start > end) {
183 fprintf(stdout, "Start MUST be LESS than end\n");
184 exit(420132);
185 }
186
187 args.nbuff = TRIALS;
188
189 Setup(&args);
190 Establish(&args);
191
192 if (args.tr) {
193 if ((out = fopen(s, "w")) == NULL) {
194 fprintf(stdout, "Can't open %s for output\n", s);
195 exit(1);
196 }
197 }
198
199 latency = TestLatency(&args);
200 synctime = TestSyncTime(&args);
201
202
203 if (args.tr) {
204 SendTime(&args, &latency, &nzero);
205 } else {
206 RecvTime(&args, &latency, &nzero);
207 }
208 if (args.tr && printopt) {
209 printf("Latency: %0.9f\n", latency);
210 fflush(stdout);
211 printf("Sync Time: %0.9f\n", synctime);
212 fflush(stdout);
213 printf("Now starting main loop\n");
214 fflush(stdout);
215 }
216 tlast = latency;
217 inc = (start > 1 && !detailflag) ? start / 2 : inc;
218 args.bufflen = start;
219
220 if (one_buffer) {
221 args.buff = (char *) malloc(onebuffersize);
222 args.buff1 = (char *) malloc(onebuffersize);
223 }
224
225 /* Main loop of benchmark */
226 for (nq = n = 0, len = start;
227 n < g_NSAMP && tlast < g_STOPTM && len <= end && !quit; len = len + inc, nq++) {
228 if (nq > 2 && !detailflag)
229 inc = ((nq % 2)) ? inc + inc : inc;
230
231 /* This is a perturbation loop to test nearby values */
232 for (ipert = 0, pert = (!detailflag && inc > PERT + 1) ? -PERT : 0;
233 pert <= PERT && !quit;
234 ipert++, n++, pert += (!detailflag && inc > PERT + 1) ? PERT : PERT + 1) {
235
236 /* Calculate howmany times to repeat the experiment. */
237 if (args.tr) {
238 if (args.bufflen == 0)
239 nrepeat = g_LATENCYREPS;
240 else
241 nrepeat = (int) (MAX((RUNTM / ((double) args.bufflen /
242 (args.bufflen - inc + 1.0) * tlast)), TRIALS));
243 SendTime(&args, &tzero, &nrepeat);
244 } else {
245 nrepeat = 1; /* Just needs to be greater than zero */
246 RecvTime(&args, &tzero, &nrepeat);
247 }
248
249 /* Allocate the buffer */
250 args.bufflen = len + pert;
251 if (one_buffer) {
252 if (bNoCache) {
253 if (args.bufflen * nrepeat + bufalign > onebuffersize) {
254 fprintf(stdout, "Exceeded user specified buffer size\n");
255 fflush(stdout);
256 quit = 1;
257 break;
258 }
259 } else {
260 if (args.bufflen + bufalign > onebuffersize) {
261 fprintf(stdout, "Exceeded user specified buffer size\n");
262 fflush(stdout);
263 quit = 1;
264 break;
265 }
266 }
267 } else {
268 /* printf("allocating %d bytes\n",
269 * args.bufflen * nrepeat + bufalign); */
270 if (bNoCache) {
271 if ((args.buff =
272 (char *) malloc(args.bufflen * nrepeat + bufalign)) == (char *) NULL) {
273 fprintf(stdout, "Couldn't allocate memory\n");
274 fflush(stdout);
275 break;
276 }
277 } else {
278 if ((args.buff = (char *) malloc(args.bufflen + bufalign)) == (char *) NULL) {
279 fprintf(stdout, "Couldn't allocate memory\n");
280 fflush(stdout);
281 break;
282 }
283 }
284 /* if ((args.buff1 = (char *)malloc(args.bufflen * nrepeat + bufalign)) == (char *)NULL) */
285 if ((args.buff1 = (char *) malloc(args.bufflen + bufalign)) == (char *) NULL) {
286 fprintf(stdout, "Couldn't allocate memory\n");
287 fflush(stdout);
288 break;
289 }
290 }
291 /* Possibly align the data buffer */
292 memtmp = args.buff;
293 memtmp1 = args.buff1;
294
295 if (!bNoCache) {
296 if (bufalign != 0) {
297 args.buff +=
298 (bufalign - ((MPI_Aint) args.buff % bufalign) + bufoffset) % bufalign;
299 /* args.buff1 += (bufalign - ((MPI_Aint)args.buff1 % bufalign) + bufoffset) % bufalign; */
300 }
301 }
302 args.buff1 += (bufalign - ((MPI_Aint) args.buff1 % bufalign) + bufoffset) % bufalign;
303
304 if (args.tr && printopt) {
305 fprintf(stdout, "%3d: %9zu bytes %4d times --> ", n, args.bufflen, nrepeat);
306 fflush(stdout);
307 }
308
309 /* Finally, we get to transmit or receive and time */
310 if (args.tr) {
311 bwdata[n].t = LONGTIME;
312 t2 = t1 = 0;
313 for (i = 0; i < TRIALS; i++) {
314 if (bNoCache) {
315 if (bufalign != 0) {
316 args.buff =
317 memtmp +
318 ((bufalign - ((MPI_Aint) args.buff % bufalign) +
319 bufoffset) % bufalign);
320 /* args.buff1 = memtmp1 + ((bufalign - ((MPI_Aint)args.buff1 % bufalign) + bufoffset) % bufalign); */
321 } else {
322 args.buff = memtmp;
323 /* args.buff1 = memtmp1; */
324 }
325 }
326
327 Sync(&args);
328 t0 = When();
329 for (j = 0; j < nrepeat; j++) {
330 if (bHeadToHead)
331 SendRecvData(&args);
332 else {
333 SendData(&args);
334 if (!streamopt) {
335 RecvData(&args);
336 }
337 }
338 if (bNoCache) {
339 args.buff += args.bufflen;
340 /* args.buff1 += args.bufflen; */
341 }
342 }
343 t = (When() - t0) / ((1 + !streamopt) * nrepeat);
344
345 if (!streamopt) {
346 t2 += t * t;
347 t1 += t;
348 bwdata[n].t = MIN(bwdata[n].t, t);
349 }
350 }
351 if (!streamopt)
352 SendTime(&args, &bwdata[n].t, &nzero);
353 else
354 RecvTime(&args, &bwdata[n].t, &nzero);
355
356 if (!streamopt)
357 bwdata[n].variance = t2 / TRIALS - t1 / TRIALS * t1 / TRIALS;
358
359 } else {
360 bwdata[n].t = LONGTIME;
361 t2 = t1 = 0;
362 for (i = 0; i < TRIALS; i++) {
363 if (bNoCache) {
364 if (bufalign != 0) {
365 args.buff =
366 memtmp +
367 ((bufalign - ((MPI_Aint) args.buff % bufalign) +
368 bufoffset) % bufalign);
369 /* args.buff1 = memtmp1 + ((bufalign - ((MPI_Aint)args.buff1 % bufalign) + bufoffset) % bufalign); */
370 } else {
371 args.buff = memtmp;
372 /* args.buff1 = memtmp1; */
373 }
374 }
375
376 Sync(&args);
377 t0 = When();
378 for (j = 0; j < nrepeat; j++) {
379 if (bHeadToHead)
380 SendRecvData(&args);
381 else {
382 RecvData(&args);
383 if (!streamopt)
384 SendData(&args);
385 }
386 if (bNoCache) {
387 args.buff += args.bufflen;
388 /* args.buff1 += args.bufflen; */
389 }
390 }
391 t = (When() - t0) / ((1 + !streamopt) * nrepeat);
392
393 if (streamopt) {
394 t2 += t * t;
395 t1 += t;
396 bwdata[n].t = MIN(bwdata[n].t, t);
397 }
398 }
399 if (streamopt)
400 SendTime(&args, &bwdata[n].t, &nzero);
401 else
402 RecvTime(&args, &bwdata[n].t, &nzero);
403
404 if (streamopt)
405 bwdata[n].variance = t2 / TRIALS - t1 / TRIALS * t1 / TRIALS;
406 }
407 tlast = bwdata[n].t;
408 bwdata[n].bits = args.bufflen * CHARSIZE;
409 bwdata[n].bps = bwdata[n].bits / (bwdata[n].t * 1024 * 1024);
410 bwdata[n].repeat = nrepeat;
411
412 if (args.tr) {
413 if (bSavePert) {
414 /* fprintf(out,"%f\t%f\t%d\t%d\t%f\n", bwdata[n].t, bwdata[n].bps,
415 * bwdata[n].bits, bwdata[n].bits / 8, bwdata[n].variance); */
416 if (bUseMegaBytes)
417 fprintf(out, "%d\t%f\t%0.9f\n", bwdata[n].bits / 8, bwdata[n].bps / 8,
418 bwdata[n].t);
419 else
420 fprintf(out, "%d\t%f\t%0.9f\n", bwdata[n].bits / 8, bwdata[n].bps,
421 bwdata[n].t);
422 fflush(out);
423 }
424 }
425 if (!one_buffer) {
426 free(memtmp);
427 free(memtmp1);
428 }
429 if (args.tr && printopt) {
430 if (bUseMegaBytes)
431 fprintf(stdout, " %6.2f MBps in %0.9f sec\n", bwdata[n].bps / 8, tlast);
432 else
433 fprintf(stdout, " %6.2f Mbps in %0.9f sec\n", bwdata[n].bps, tlast);
434 fflush(stdout);
435 }
436 } /* End of perturbation loop */
437 if (!bSavePert && args.tr) {
438 /* if we didn't save all of the perturbation loops, find the max and save it */
439 int index = 1;
440 double dmax = bwdata[n - 1].bps;
441 for (; ipert > 1; ipert--) {
442 if (bwdata[n - ipert].bps > dmax) {
443 index = ipert;
444 dmax = bwdata[n - ipert].bps;
445 }
446 }
447 if (bUseMegaBytes)
448 fprintf(out, "%d\t%f\t%0.9f\n", bwdata[n - index].bits / 8,
449 bwdata[n - index].bps / 8, bwdata[n - index].t);
450 else
451 fprintf(out, "%d\t%f\t%0.9f\n", bwdata[n - index].bits / 8, bwdata[n - index].bps,
452 bwdata[n - index].t);
453 fflush(out);
454 }
455 } /* End of main loop */
456
457 if (args.tr)
458 fclose(out);
459 /* THE_END: */
460 CleanUp(&args);
461 free(bwdata);
462 return 0;
463 }
464
465
466 /* Return the current time in seconds, using a double precision number. */
When()467 double When()
468 {
469 return MPI_Wtime();
470 }
471
Setup(ArgStruct * p)472 int Setup(ArgStruct * p)
473 {
474 int nproc;
475 char s[255];
476 int len = 255;
477
478 MPI_Comm_rank(MPI_COMM_WORLD, &p->prot.iproc);
479 MPI_Comm_size(MPI_COMM_WORLD, &nproc);
480
481 MPI_Get_processor_name(s, &len);
482 /*gethostname(s, len); */
483 printf("%d: %s\n", p->prot.iproc, s);
484 fflush(stdout);
485
486 if (p->prot.iproc == 0)
487 p->prot.nbor = 1;
488 else
489 p->prot.nbor = 0;
490
491 if (nproc < 2) {
492 printf("Need two processes\n");
493 printf("nproc: %i\n", nproc);
494 exit(-2);
495 }
496
497 if (p->prot.iproc == 0)
498 p->tr = 1;
499 else
500 p->tr = 0;
501 return 1;
502 }
503
Sync(ArgStruct * p)504 void Sync(ArgStruct * p)
505 {
506 char ch;
507 MPI_Status status;
508 if (p->tr) {
509 MPI_Send(&ch, 0, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD);
510 MPI_Recv(&ch, 0, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &status);
511 MPI_Send(&ch, 0, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD);
512 } else {
513 MPI_Recv(&ch, 0, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &status);
514 MPI_Send(&ch, 0, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD);
515 MPI_Recv(&ch, 0, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &status);
516 }
517 }
518
DetermineLatencyReps(ArgStruct * p)519 int DetermineLatencyReps(ArgStruct * p)
520 {
521 MPI_Status status;
522 double t0, duration = 0;
523 int reps = 1, prev_reps = 0;
524 int i;
525
526 /* prime the send/receive pipes */
527 Sync(p);
528 Sync(p);
529 Sync(p);
530
531 /* test how long it takes to send n messages
532 * where n = 1, 2, 4, 8, 16, 32, ...
533 */
534 t0 = When();
535 t0 = When();
536 t0 = When();
537 while ((duration < 1) || (duration < 3 && reps < 1000)) {
538 t0 = When();
539 for (i = 0; i < reps - prev_reps; i++) {
540 Sync(p);
541 }
542 duration += When() - t0;
543 prev_reps = reps;
544 reps = reps * 2;
545
546 /* use duration from the root only */
547 if (p->prot.iproc == 0)
548 MPI_Send(&duration, 1, MPI_DOUBLE, p->prot.nbor, 2, MPI_COMM_WORLD);
549 else
550 MPI_Recv(&duration, 1, MPI_DOUBLE, p->prot.nbor, 2, MPI_COMM_WORLD, &status);
551 }
552
553 return reps;
554 }
555
TestLatency(ArgStruct * p)556 double TestLatency(ArgStruct * p)
557 {
558 double latency, t0;
559 int i;
560
561 g_LATENCYREPS = DetermineLatencyReps(p);
562 if (g_LATENCYREPS < 1024 && p->prot.iproc == 0) {
563 printf("Using %d reps to determine latency\n", g_LATENCYREPS);
564 fflush(stdout);
565 }
566
567 p->bufflen = 0;
568 p->buff = NULL; /*(char *)malloc(p->bufflen); */
569 p->buff1 = NULL; /*(char *)malloc(p->bufflen); */
570 Sync(p);
571 t0 = When();
572 t0 = When();
573 t0 = When();
574 t0 = When();
575 for (i = 0; i < g_LATENCYREPS; i++) {
576 if (p->tr) {
577 SendData(p);
578 RecvData(p);
579 } else {
580 RecvData(p);
581 SendData(p);
582 }
583 }
584 latency = (When() - t0) / (2 * g_LATENCYREPS);
585 /*
586 * free(p->buff);
587 * free(p->buff1);
588 */
589
590 return latency;
591 }
592
TestSyncTime(ArgStruct * p)593 double TestSyncTime(ArgStruct * p)
594 {
595 double synctime, t0;
596 int i;
597
598 t0 = When();
599 t0 = When();
600 t0 = When();
601 t0 = When();
602 t0 = When();
603 t0 = When();
604 for (i = 0; i < g_LATENCYREPS; i++)
605 Sync(p);
606 synctime = (When() - t0) / g_LATENCYREPS;
607
608 return synctime;
609 }
610
SendRecvData(ArgStruct * p)611 void SendRecvData(ArgStruct * p)
612 {
613 MPI_Status status;
614
615 /*MPI_Sendrecv(p->buff, p->bufflen, MPI_BYTE, p->prot.nbor, 1, p->buff1, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &status); */
616
617 MPI_Request request;
618 MPI_Irecv(p->buff1, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &request);
619 MPI_Send(p->buff, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD);
620 MPI_Wait(&request, &status);
621
622 /*
623 * MPI_Send(p->buff, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD);
624 * MPI_Recv(p->buff1, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &status);
625 */
626 }
627
SendData(ArgStruct * p)628 void SendData(ArgStruct * p)
629 {
630 MPI_Send(p->buff, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD);
631 }
632
RecvData(ArgStruct * p)633 void RecvData(ArgStruct * p)
634 {
635 MPI_Status status;
636 MPI_Recv(p->buff1, p->bufflen, MPI_BYTE, p->prot.nbor, 1, MPI_COMM_WORLD, &status);
637 }
638
639
SendTime(ArgStruct * p,double * t,int * rpt)640 void SendTime(ArgStruct * p, double *t, int *rpt)
641 {
642 if (*rpt > 0)
643 MPI_Send(rpt, 1, MPI_INT, p->prot.nbor, 2, MPI_COMM_WORLD);
644 else
645 MPI_Send(t, 1, MPI_DOUBLE, p->prot.nbor, 2, MPI_COMM_WORLD);
646 }
647
RecvTime(ArgStruct * p,double * t,int * rpt)648 void RecvTime(ArgStruct * p, double *t, int *rpt)
649 {
650 MPI_Status status;
651 if (*rpt > 0)
652 MPI_Recv(rpt, 1, MPI_INT, p->prot.nbor, 2, MPI_COMM_WORLD, &status);
653 else
654 MPI_Recv(t, 1, MPI_DOUBLE, p->prot.nbor, 2, MPI_COMM_WORLD, &status);
655 }
656
Establish(ArgStruct * p)657 int Establish(ArgStruct * p)
658 {
659 return 1;
660 }
661
CleanUp(ArgStruct * p)662 int CleanUp(ArgStruct * p)
663 {
664 /*MPI_Barrier(MPI_COMM_WORLD); */
665 MPI_Finalize();
666 return 1;
667 }
668