1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 *
4 * (C) 2003 by Argonne National Laboratory.
5 * See COPYRIGHT in top-level directory.
6 */
7
8 #include "mpi.h"
9 #include "mpitest.h"
10 #include "mpithreadtest.h"
11 #include <stdio.h>
12 #include <stdlib.h>
13
14 #define MAX_THREADS 4
15 /* #define LOOPS 10000 */
16 #define LOOPS 1000
17 #define WINDOW 16
18 /* #define MAX_MSG_SIZE 16384 */
19 #define MAX_MSG_SIZE 4096
20 #define HOP 4
21
22 /* Emulated thread private storage */
23 struct tp {
24 int thread_id;
25 int use_proc_null;
26 int use_blocking_comm;
27 int msg_size;
28 double latency;
29 } tp[MAX_THREADS];
30
31 int size, rank;
32 char sbuf[MAX_MSG_SIZE], rbuf[MAX_MSG_SIZE];
33 static int verbose = 0;
34 static volatile int num_threads;
35 static MTEST_THREAD_LOCK_TYPE num_threads_lock;
36
37 #define ABORT_MSG(msg_) do { printf("%s", (msg_)); MPI_Abort(MPI_COMM_WORLD, 1); } while (0)
38
39 MTEST_THREAD_RETURN_TYPE run_test(void *arg);
run_test(void * arg)40 MTEST_THREAD_RETURN_TYPE run_test(void *arg)
41 {
42 int thread_id = (int)(long) arg;
43 int i, j, peer;
44 MPI_Status status[WINDOW];
45 MPI_Request req[WINDOW];
46 double start, end;
47 int err;
48 int local_num_threads = -1;
49
50 if (tp[thread_id].use_proc_null)
51 peer = MPI_PROC_NULL;
52 else
53 peer = (rank % 2) ? rank - 1 : rank + 1;
54
55 err = MTest_thread_lock(&num_threads_lock);
56 if (err) ABORT_MSG("unable to acquire lock, aborting\n");
57 local_num_threads = num_threads;
58 err = MTest_thread_unlock(&num_threads_lock);
59 if (err) ABORT_MSG("unable to release lock, aborting\n");
60
61 MTest_thread_barrier(num_threads);
62
63 start = MPI_Wtime();
64
65 if (tp[thread_id].use_blocking_comm) {
66 if ((rank % 2) == 0) {
67 for (i = 0; i < LOOPS; i++)
68 for (j = 0; j < WINDOW; j++)
69 MPI_Send(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD);
70 }
71 else {
72 for (i = 0; i < LOOPS; i++)
73 for (j = 0; j < WINDOW; j++)
74 MPI_Recv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
75 &status[0]);
76 }
77 }
78 else {
79 for (i = 0; i < LOOPS; i++) {
80 if ((rank % 2) == 0) {
81 for (j = 0; j < WINDOW; j++)
82 MPI_Isend(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
83 &req[j]);
84 }
85 else {
86 for (j = 0; j < WINDOW; j++)
87 MPI_Irecv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
88 &req[j]);
89 }
90 MPI_Waitall(WINDOW, req, status);
91 }
92 }
93
94 end = MPI_Wtime();
95 tp[thread_id].latency = 1000000.0 * (end - start) / (LOOPS * WINDOW);
96
97 MTest_thread_barrier(num_threads);
98 return MTEST_THREAD_RETVAL_IGN;
99 }
100
101 void loops(void);
loops(void)102 void loops(void)
103 {
104 int i, nt;
105 double latency, mrate, avg_latency, agg_mrate;
106 int err;
107
108 err = MTest_thread_lock_create(&num_threads_lock);
109 if (err) ABORT_MSG("unable to create lock, aborting\n");
110
111 for (nt = 1; nt <= MAX_THREADS; nt++) {
112 err = MTest_thread_lock(&num_threads_lock);
113 if (err) ABORT_MSG("unable to acquire lock, aborting\n");
114
115 num_threads = 1;
116 MPI_Barrier(MPI_COMM_WORLD);
117 MTest_thread_barrier_init();
118
119 for (i = 1; i < nt; i++) {
120 err = MTest_Start_thread(run_test, (void *)(long)i);
121 if (err) {
122 /* attempt to continue with fewer threads, we may be on a
123 * thread-constrained platform like BG/P in DUAL mode */
124 break;
125 }
126 ++num_threads;
127 }
128
129 err = MTest_thread_unlock(&num_threads_lock);
130 if (err) ABORT_MSG("unable to release lock, aborting\n");
131
132 if (nt > 1 && num_threads <= 1) {
133 ABORT_MSG("unable to create any additional threads, aborting\n");
134 }
135
136 run_test((void *) 0); /* we are thread 0 */
137 err = MTest_Join_threads();
138 if (err) {
139 printf("error joining threads, err=%d", err);
140 MPI_Abort(MPI_COMM_WORLD, 1);
141 }
142
143 MTest_thread_barrier_free();
144
145 latency = 0;
146 for (i = 0; i < num_threads; i++)
147 latency += tp[i].latency;
148 latency /= num_threads; /* Average latency */
149 mrate = num_threads / latency; /* Message rate */
150
151 /* Global latency and message rate */
152 MPI_Reduce(&latency, &avg_latency, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
153 avg_latency /= size;
154 MPI_Reduce(&mrate, &agg_mrate, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
155
156 if (!rank && verbose) {
157 printf("Threads: %d; Latency: %.3f; Mrate: %.3f\n",
158 num_threads, latency, mrate);
159 }
160 }
161
162 err = MTest_thread_lock_free(&num_threads_lock);
163 if (err) ABORT_MSG("unable to free lock, aborting\n");
164 }
165
main(int argc,char ** argv)166 int main(int argc, char ** argv)
167 {
168 int pmode, i, j;
169
170 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode);
171 if (pmode != MPI_THREAD_MULTIPLE) {
172 fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n");
173 MPI_Abort(MPI_COMM_WORLD, -1);
174 }
175
176 MPI_Comm_size(MPI_COMM_WORLD, &size);
177 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
178
179 if (getenv("MPITEST_VERBOSE"))
180 verbose = 1;
181
182 /* For communication, we need an even number of processes */
183 if (size % 2) {
184 fprintf(stderr, "This test needs an even number of processes\n");
185 MPI_Abort(MPI_COMM_WORLD, -1);
186 }
187
188 /* PROC_NULL */
189 for (i = 0; i < MAX_THREADS; i++) {
190 tp[i].thread_id = i;
191 tp[i].use_proc_null = 1;
192 tp[i].use_blocking_comm = 1;
193 tp[i].msg_size = 0;
194 }
195 if (!rank && verbose) {
196 printf("\nUsing MPI_PROC_NULL\n");
197 printf("-------------------\n");
198 }
199 loops();
200
201 /* Blocking communication */
202 for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) {
203 for (i = 0; i < MAX_THREADS; i++) {
204 tp[i].thread_id = i;
205 tp[i].use_proc_null = 0;
206 tp[i].use_blocking_comm = 1;
207 tp[i].msg_size = j;
208 }
209 if (!rank && verbose) {
210 printf("\nBlocking communication with message size %6d bytes\n", j);
211 printf("------------------------------------------------------\n");
212 }
213 loops();
214 }
215
216 /* Non-blocking communication */
217 for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) {
218 for (i = 0; i < MAX_THREADS; i++) {
219 tp[i].thread_id = i;
220 tp[i].use_proc_null = 0;
221 tp[i].use_blocking_comm = 0;
222 tp[i].msg_size = j;
223 }
224 if (!rank && verbose) {
225 printf("\nNon-blocking communication with message size %6d bytes\n", j);
226 printf("----------------------------------------------------------\n");
227 }
228 loops();
229 }
230
231 if (rank == 0) {
232 printf( " No Errors\n" );
233 }
234
235 MPI_Finalize();
236
237 return 0;
238 }
239