1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3  *
4  *  (C) 2003 by Argonne National Laboratory.
5  *      See COPYRIGHT in top-level directory.
6  */
7 
8 #include "mpi.h"
9 #include "mpitest.h"
10 #include "mpithreadtest.h"
11 #include <stdio.h>
12 #include <stdlib.h>
13 
14 #define MAX_THREADS 4
15 /* #define LOOPS 10000 */
16 #define LOOPS 1000
17 #define WINDOW 16
18 /* #define MAX_MSG_SIZE 16384 */
19 #define MAX_MSG_SIZE 4096
20 #define HOP 4
21 
22 /* Emulated thread private storage */
23 struct tp {
24     int thread_id;
25     int use_proc_null;
26     int use_blocking_comm;
27     int msg_size;
28     double latency;
29 } tp[MAX_THREADS];
30 
31 int size, rank;
32 char sbuf[MAX_MSG_SIZE], rbuf[MAX_MSG_SIZE];
33 static int verbose = 0;
34 static volatile int num_threads;
35 static MTEST_THREAD_LOCK_TYPE num_threads_lock;
36 
37 #define ABORT_MSG(msg_) do { printf("%s", (msg_)); MPI_Abort(MPI_COMM_WORLD, 1); } while (0)
38 
39 MTEST_THREAD_RETURN_TYPE run_test(void *arg);
run_test(void * arg)40 MTEST_THREAD_RETURN_TYPE run_test(void *arg)
41 {
42     int thread_id = (int)(long) arg;
43     int i, j, peer;
44     MPI_Status status[WINDOW];
45     MPI_Request req[WINDOW];
46     double start, end;
47     int err;
48     int local_num_threads = -1;
49 
50     if (tp[thread_id].use_proc_null)
51         peer = MPI_PROC_NULL;
52     else
53         peer = (rank % 2) ? rank - 1 : rank + 1;
54 
55     err = MTest_thread_lock(&num_threads_lock);
56     if (err) ABORT_MSG("unable to acquire lock, aborting\n");
57     local_num_threads = num_threads;
58     err = MTest_thread_unlock(&num_threads_lock);
59     if (err) ABORT_MSG("unable to release lock, aborting\n");
60 
61     MTest_thread_barrier(num_threads);
62 
63     start = MPI_Wtime();
64 
65     if (tp[thread_id].use_blocking_comm) {
66         if ((rank % 2) == 0) {
67             for (i = 0; i < LOOPS; i++)
68                 for (j = 0; j < WINDOW; j++)
69                     MPI_Send(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD);
70         }
71         else {
72             for (i = 0; i < LOOPS; i++)
73                 for (j = 0; j < WINDOW; j++)
74                     MPI_Recv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
75                              &status[0]);
76         }
77     }
78     else {
79         for (i = 0; i < LOOPS; i++) {
80             if ((rank % 2) == 0) {
81                 for (j = 0; j < WINDOW; j++)
82                     MPI_Isend(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
83                               &req[j]);
84             }
85             else {
86                 for (j = 0; j < WINDOW; j++)
87                     MPI_Irecv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
88                               &req[j]);
89             }
90             MPI_Waitall(WINDOW, req, status);
91         }
92     }
93 
94     end = MPI_Wtime();
95     tp[thread_id].latency = 1000000.0 * (end - start) / (LOOPS * WINDOW);
96 
97     MTest_thread_barrier(num_threads);
98     return MTEST_THREAD_RETVAL_IGN;
99 }
100 
101 void loops(void);
loops(void)102 void loops(void)
103 {
104     int i, nt;
105     double latency, mrate, avg_latency, agg_mrate;
106     int err;
107 
108     err = MTest_thread_lock_create(&num_threads_lock);
109     if (err) ABORT_MSG("unable to create lock, aborting\n");
110 
111     for (nt = 1; nt <= MAX_THREADS; nt++) {
112         err = MTest_thread_lock(&num_threads_lock);
113         if (err) ABORT_MSG("unable to acquire lock, aborting\n");
114 
115         num_threads = 1;
116         MPI_Barrier(MPI_COMM_WORLD);
117         MTest_thread_barrier_init();
118 
119         for (i = 1; i < nt; i++) {
120             err = MTest_Start_thread(run_test, (void *)(long)i);
121             if (err) {
122                 /* attempt to continue with fewer threads, we may be on a
123                  * thread-constrained platform like BG/P in DUAL mode */
124                 break;
125             }
126             ++num_threads;
127         }
128 
129         err = MTest_thread_unlock(&num_threads_lock);
130         if (err) ABORT_MSG("unable to release lock, aborting\n");
131 
132         if (nt > 1 && num_threads <= 1) {
133             ABORT_MSG("unable to create any additional threads, aborting\n");
134         }
135 
136         run_test((void *) 0); /* we are thread 0 */
137         err = MTest_Join_threads();
138         if (err) {
139             printf("error joining threads, err=%d", err);
140             MPI_Abort(MPI_COMM_WORLD, 1);
141         }
142 
143         MTest_thread_barrier_free();
144 
145         latency = 0;
146         for (i = 0; i < num_threads; i++)
147             latency += tp[i].latency;
148         latency /= num_threads; /* Average latency */
149         mrate = num_threads / latency; /* Message rate */
150 
151         /* Global latency and message rate */
152         MPI_Reduce(&latency, &avg_latency, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
153         avg_latency /= size;
154         MPI_Reduce(&mrate, &agg_mrate, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
155 
156         if (!rank && verbose) {
157             printf("Threads: %d; Latency: %.3f; Mrate: %.3f\n",
158                    num_threads, latency, mrate);
159         }
160     }
161 
162     err = MTest_thread_lock_free(&num_threads_lock);
163     if (err) ABORT_MSG("unable to free lock, aborting\n");
164 }
165 
main(int argc,char ** argv)166 int main(int argc, char ** argv)
167 {
168     int pmode, i, j;
169 
170     MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode);
171     if (pmode != MPI_THREAD_MULTIPLE) {
172         fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n");
173         MPI_Abort(MPI_COMM_WORLD, -1);
174     }
175 
176     MPI_Comm_size(MPI_COMM_WORLD, &size);
177     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
178 
179     if (getenv("MPITEST_VERBOSE"))
180         verbose = 1;
181 
182     /* For communication, we need an even number of processes */
183     if (size % 2) {
184         fprintf(stderr, "This test needs an even number of processes\n");
185         MPI_Abort(MPI_COMM_WORLD, -1);
186     }
187 
188     /* PROC_NULL */
189     for (i = 0; i < MAX_THREADS; i++) {
190         tp[i].thread_id = i;
191         tp[i].use_proc_null = 1;
192         tp[i].use_blocking_comm = 1;
193         tp[i].msg_size = 0;
194     }
195     if (!rank && verbose) {
196         printf("\nUsing MPI_PROC_NULL\n");
197         printf("-------------------\n");
198     }
199     loops();
200 
201     /* Blocking communication */
202     for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) {
203         for (i = 0; i < MAX_THREADS; i++) {
204             tp[i].thread_id = i;
205             tp[i].use_proc_null = 0;
206             tp[i].use_blocking_comm = 1;
207             tp[i].msg_size = j;
208         }
209         if (!rank && verbose) {
210             printf("\nBlocking communication with message size %6d bytes\n", j);
211             printf("------------------------------------------------------\n");
212         }
213         loops();
214     }
215 
216     /* Non-blocking communication */
217     for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) {
218         for (i = 0; i < MAX_THREADS; i++) {
219             tp[i].thread_id = i;
220             tp[i].use_proc_null = 0;
221             tp[i].use_blocking_comm = 0;
222             tp[i].msg_size = j;
223         }
224         if (!rank && verbose) {
225             printf("\nNon-blocking communication with message size %6d bytes\n", j);
226             printf("----------------------------------------------------------\n");
227         }
228         loops();
229     }
230 
231     if (rank == 0) {
232 	printf( " No Errors\n" );
233     }
234 
235     MPI_Finalize();
236 
237     return 0;
238 }
239