1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 /* This program provides a simple test of send-receive performance between
7 two (or more) processes. This sometimes called head-to-head or
8 ping-ping test, as both processes send at the same time.
9 */
10
11 #include "mpi.h"
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <assert.h>
15 #include "mpitest.h"
16 #include "mpithreadtest.h"
17
18 #define CACHELINE_SIZE 64
19
20 #define MESSAGE_SIZE 8
21 #define NUM_MESSAGES 64000
22 #define WINDOW_SIZE 64
23
24 #define ERROR_MARGIN 0.05 /* FIXME: a better margin? */
25
26 MPI_Comm *thread_comms;
27 double *t_elapsed;
28
29 MTEST_THREAD_RETURN_TYPE thread_fn(void *arg);
30
thread_fn(void * arg)31 MTEST_THREAD_RETURN_TYPE thread_fn(void *arg)
32 {
33 int error;
34 int tid;
35 MPI_Comm my_comm;
36 int rank;
37 int win_i, win_post_i, win_posts;
38 void *buf;
39 int sync_buf;
40 MPI_Request requests[WINDOW_SIZE];
41 MPI_Status statuses[WINDOW_SIZE];
42 double t_start, t_end;
43
44 tid = (int) (long) arg;
45 my_comm = thread_comms[tid];
46 MPI_Comm_rank(my_comm, &rank);
47
48 win_posts = NUM_MESSAGES / WINDOW_SIZE;
49 assert(win_posts * WINDOW_SIZE == NUM_MESSAGES);
50
51 /* Allocate a cache-aligned buffer to prevent potential effects of serialization:
52 * either false-sharing on the CPU or serialization in the NIC's parallel TLB
53 * engine
54 */
55 error = posix_memalign(&buf, CACHELINE_SIZE, MESSAGE_SIZE * sizeof(char));
56 if (error) {
57 fprintf(stderr, "Thread %d: Error in allocating send buffer\n", tid);
58 }
59
60 /* Benchmark */
61 t_start = MPI_Wtime();
62
63 for (win_post_i = 0; win_post_i < win_posts; win_post_i++) {
64 for (win_i = 0; win_i < WINDOW_SIZE; win_i++) {
65 if (rank == 0) {
66 MPI_Isend(buf, MESSAGE_SIZE, MPI_CHAR, 1, tid, my_comm, &requests[win_i]);
67 } else {
68 MPI_Irecv(buf, MESSAGE_SIZE, MPI_CHAR, 0, tid, my_comm, &requests[win_i]);
69 }
70 }
71 MPI_Waitall(WINDOW_SIZE, requests, statuses);
72 }
73
74 /* Sync */
75 if (rank == 0) {
76 MPI_Recv(&sync_buf, 1, MPI_INT, 1, tid, my_comm, MPI_STATUS_IGNORE);
77 } else {
78 MPI_Send(&sync_buf, 1, MPI_INT, 0, tid, my_comm);
79 }
80
81 if (rank == 0) {
82 t_end = MPI_Wtime();
83 t_elapsed[tid] = t_end - t_start;
84 }
85
86 free(buf);
87 }
88
89
main(int argc,char * argv[])90 int main(int argc, char *argv[])
91 {
92 int rank, size;
93 int provided;
94 int num_threads;
95 double onethread_msg_rate, multithread_msg_rate;
96 int errors;
97 MPI_Info info;
98
99 if (argc > 2) {
100 fprintf(stderr, "Can support at most only the -nthreads argument.\n");
101 MPI_Abort(MPI_COMM_WORLD, 1);
102 }
103
104 MTest_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
105
106 if (provided != MPI_THREAD_MULTIPLE) {
107 fprintf(stderr, "MPI_THREAD_MULTIPLE required for this test.\n");
108 MPI_Abort(MPI_COMM_WORLD, 1);
109 }
110
111 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
112 MPI_Comm_size(MPI_COMM_WORLD, &size);
113 if (size != 2) {
114 fprintf(stderr, "please run with exactly two processes.\n");
115 MPI_Abort(MPI_COMM_WORLD, 1);
116 }
117
118 errors = MTest_thread_barrier_init();
119 if (errors) {
120 fprintf(stderr, "Could not create thread barrier\n");
121 MPI_Abort(MPI_COMM_WORLD, 1);
122 }
123
124 MTestArgList *head = MTestArgListCreate(argc, argv);
125 num_threads = MTestArgListGetInt(head, "nthreads");
126 MTestArgListDestroy(head);
127
128 thread_comms = (MPI_Comm *) malloc(sizeof(MPI_Comm) * num_threads);
129 t_elapsed = calloc(num_threads, sizeof(double));
130
131 /* Create a communicator per thread */
132 MPI_Info_create(&info);
133 MPI_Info_set(info, "mpi_assert_new_vci", "true");
134 for (int i = 0; i < num_threads; i++) {
135 MPI_Comm_dup_with_info(MPI_COMM_WORLD, info, &thread_comms[i]);
136 }
137
138 /* Run test with 1 thread */
139 thread_fn((void *) 0);
140 onethread_msg_rate = ((double) NUM_MESSAGES / t_elapsed[0]) / 1e6;
141
142 /* Run test with multiple threads */
143 for (int i = 1; i < num_threads; i++) {
144 MTest_Start_thread(thread_fn, (void *) (long) i);
145 }
146 thread_fn((void *) 0);
147
148 MTest_Join_threads();
149
150 MTest_thread_barrier_free();
151
152 /* Calculate message rate with multiple threads */
153 if (rank == 0) {
154 MTestPrintfMsg(1, "Number of messages: %d\n", NUM_MESSAGES);
155 MTestPrintfMsg(1, "Message size: %d\n", MESSAGE_SIZE);
156 MTestPrintfMsg(1, "Window size: %d\n", WINDOW_SIZE);
157 MTestPrintfMsg(1, "Mmsgs/s with one thread: %-10.2f\n\n", onethread_msg_rate);
158 MTestPrintfMsg(1, "%-10s\t%-10s\t%-10s\n", "Thread", "Mmsgs/s", "Error");
159
160 multithread_msg_rate = 0;
161 errors = 0;
162 for (int tid = 0; tid < num_threads; tid++) {
163 double my_msg_rate = ((double) NUM_MESSAGES / t_elapsed[tid]) / 1e6;
164 int my_error = 0;
165 if ((1 - (my_msg_rate / onethread_msg_rate)) > ERROR_MARGIN) {
166 /* Erroneous */
167 errors++;
168 my_error = 1;
169 fprintf(stderr,
170 "Thread %d message rate below threshold: %.2f / %.2f = %.2f (threshold = %.2f)\n",
171 tid, my_msg_rate, onethread_msg_rate, (my_msg_rate / onethread_msg_rate),
172 ERROR_MARGIN);
173 }
174 MTestPrintfMsg(1, "%-10d\t%-10.2f\t%-10d\n", tid, my_msg_rate, my_error);
175 multithread_msg_rate += my_msg_rate;
176 }
177 MTestPrintfMsg(1, "\n%-10s\t%-10s\t%-10s\t%-10s\n", "Size", "Threads", "Mmsgs/s", "Errors");
178 MTestPrintfMsg(1, "%-10d\t%-10d\t%-10.2f\t%-10d\n", MESSAGE_SIZE, num_threads,
179 multithread_msg_rate, errors);
180 }
181
182 for (int i = 0; i < num_threads; i++) {
183 MPI_Comm_free(&thread_comms[i]);
184 }
185 MPI_Info_free(&info);
186 free(thread_comms);
187 free(t_elapsed);
188
189 MTest_Finalize(errors);
190
191 return 0;
192 }
193