1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 /* This program provides a simple test of send-receive performance between
7    two (or more) processes.  This sometimes called head-to-head or
8    ping-ping test, as both processes send at the same time.
9 */
10 
11 #include "mpi.h"
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <assert.h>
15 #include "mpitest.h"
16 #include "mpithreadtest.h"
17 
18 #define CACHELINE_SIZE 64
19 
20 #define MESSAGE_SIZE 8
21 #define NUM_MESSAGES 64000
22 #define WINDOW_SIZE 64
23 
24 #define ERROR_MARGIN 0.05       /* FIXME: a better margin? */
25 
26 MPI_Comm *thread_comms;
27 double *t_elapsed;
28 
29 MTEST_THREAD_RETURN_TYPE thread_fn(void *arg);
30 
thread_fn(void * arg)31 MTEST_THREAD_RETURN_TYPE thread_fn(void *arg)
32 {
33     int error;
34     int tid;
35     MPI_Comm my_comm;
36     int rank;
37     int win_i, win_post_i, win_posts;
38     void *buf;
39     int sync_buf;
40     MPI_Request requests[WINDOW_SIZE];
41     MPI_Status statuses[WINDOW_SIZE];
42     double t_start, t_end;
43 
44     tid = (int) (long) arg;
45     my_comm = thread_comms[tid];
46     MPI_Comm_rank(my_comm, &rank);
47 
48     win_posts = NUM_MESSAGES / WINDOW_SIZE;
49     assert(win_posts * WINDOW_SIZE == NUM_MESSAGES);
50 
51     /* Allocate a cache-aligned buffer to prevent potential effects of serialization:
52      * either false-sharing on the CPU or serialization in the NIC's parallel TLB
53      * engine
54      */
55     error = posix_memalign(&buf, CACHELINE_SIZE, MESSAGE_SIZE * sizeof(char));
56     if (error) {
57         fprintf(stderr, "Thread %d: Error in allocating send buffer\n", tid);
58     }
59 
60     /* Benchmark */
61     t_start = MPI_Wtime();
62 
63     for (win_post_i = 0; win_post_i < win_posts; win_post_i++) {
64         for (win_i = 0; win_i < WINDOW_SIZE; win_i++) {
65             if (rank == 0) {
66                 MPI_Isend(buf, MESSAGE_SIZE, MPI_CHAR, 1, tid, my_comm, &requests[win_i]);
67             } else {
68                 MPI_Irecv(buf, MESSAGE_SIZE, MPI_CHAR, 0, tid, my_comm, &requests[win_i]);
69             }
70         }
71         MPI_Waitall(WINDOW_SIZE, requests, statuses);
72     }
73 
74     /* Sync */
75     if (rank == 0) {
76         MPI_Recv(&sync_buf, 1, MPI_INT, 1, tid, my_comm, MPI_STATUS_IGNORE);
77     } else {
78         MPI_Send(&sync_buf, 1, MPI_INT, 0, tid, my_comm);
79     }
80 
81     if (rank == 0) {
82         t_end = MPI_Wtime();
83         t_elapsed[tid] = t_end - t_start;
84     }
85 
86     free(buf);
87 }
88 
89 
main(int argc,char * argv[])90 int main(int argc, char *argv[])
91 {
92     int rank, size;
93     int provided;
94     int num_threads;
95     double onethread_msg_rate, multithread_msg_rate;
96     int errors;
97     MPI_Info info;
98 
99     if (argc > 2) {
100         fprintf(stderr, "Can support at most only the -nthreads argument.\n");
101         MPI_Abort(MPI_COMM_WORLD, 1);
102     }
103 
104     MTest_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
105 
106     if (provided != MPI_THREAD_MULTIPLE) {
107         fprintf(stderr, "MPI_THREAD_MULTIPLE required for this test.\n");
108         MPI_Abort(MPI_COMM_WORLD, 1);
109     }
110 
111     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
112     MPI_Comm_size(MPI_COMM_WORLD, &size);
113     if (size != 2) {
114         fprintf(stderr, "please run with exactly two processes.\n");
115         MPI_Abort(MPI_COMM_WORLD, 1);
116     }
117 
118     errors = MTest_thread_barrier_init();
119     if (errors) {
120         fprintf(stderr, "Could not create thread barrier\n");
121         MPI_Abort(MPI_COMM_WORLD, 1);
122     }
123 
124     MTestArgList *head = MTestArgListCreate(argc, argv);
125     num_threads = MTestArgListGetInt(head, "nthreads");
126     MTestArgListDestroy(head);
127 
128     thread_comms = (MPI_Comm *) malloc(sizeof(MPI_Comm) * num_threads);
129     t_elapsed = calloc(num_threads, sizeof(double));
130 
131     /* Create a communicator per thread */
132     MPI_Info_create(&info);
133     MPI_Info_set(info, "mpi_assert_new_vci", "true");
134     for (int i = 0; i < num_threads; i++) {
135         MPI_Comm_dup_with_info(MPI_COMM_WORLD, info, &thread_comms[i]);
136     }
137 
138     /* Run test with 1 thread */
139     thread_fn((void *) 0);
140     onethread_msg_rate = ((double) NUM_MESSAGES / t_elapsed[0]) / 1e6;
141 
142     /* Run test with multiple threads */
143     for (int i = 1; i < num_threads; i++) {
144         MTest_Start_thread(thread_fn, (void *) (long) i);
145     }
146     thread_fn((void *) 0);
147 
148     MTest_Join_threads();
149 
150     MTest_thread_barrier_free();
151 
152     /* Calculate message rate with multiple threads */
153     if (rank == 0) {
154         MTestPrintfMsg(1, "Number of messages: %d\n", NUM_MESSAGES);
155         MTestPrintfMsg(1, "Message size: %d\n", MESSAGE_SIZE);
156         MTestPrintfMsg(1, "Window size: %d\n", WINDOW_SIZE);
157         MTestPrintfMsg(1, "Mmsgs/s with one thread: %-10.2f\n\n", onethread_msg_rate);
158         MTestPrintfMsg(1, "%-10s\t%-10s\t%-10s\n", "Thread", "Mmsgs/s", "Error");
159 
160         multithread_msg_rate = 0;
161         errors = 0;
162         for (int tid = 0; tid < num_threads; tid++) {
163             double my_msg_rate = ((double) NUM_MESSAGES / t_elapsed[tid]) / 1e6;
164             int my_error = 0;
165             if ((1 - (my_msg_rate / onethread_msg_rate)) > ERROR_MARGIN) {
166                 /* Erroneous */
167                 errors++;
168                 my_error = 1;
169                 fprintf(stderr,
170                         "Thread %d message rate below threshold: %.2f / %.2f = %.2f (threshold = %.2f)\n",
171                         tid, my_msg_rate, onethread_msg_rate, (my_msg_rate / onethread_msg_rate),
172                         ERROR_MARGIN);
173             }
174             MTestPrintfMsg(1, "%-10d\t%-10.2f\t%-10d\n", tid, my_msg_rate, my_error);
175             multithread_msg_rate += my_msg_rate;
176         }
177         MTestPrintfMsg(1, "\n%-10s\t%-10s\t%-10s\t%-10s\n", "Size", "Threads", "Mmsgs/s", "Errors");
178         MTestPrintfMsg(1, "%-10d\t%-10d\t%-10.2f\t%-10d\n", MESSAGE_SIZE, num_threads,
179                        multithread_msg_rate, errors);
180     }
181 
182     for (int i = 0; i < num_threads; i++) {
183         MPI_Comm_free(&thread_comms[i]);
184     }
185     MPI_Info_free(&info);
186     free(thread_comms);
187     free(t_elapsed);
188 
189     MTest_Finalize(errors);
190 
191     return 0;
192 }
193