1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #include "mpiimpl.h"
7 #include "mpi_init.h"
8
9 /*
10 === BEGIN_MPI_T_CVAR_INFO_BLOCK ===
11
12 cvars:
13 - name : MPIR_CVAR_ASYNC_PROGRESS
14 category : THREADS
15 type : boolean
16 default : false
17 class : none
18 verbosity : MPI_T_VERBOSITY_USER_BASIC
19 scope : MPI_T_SCOPE_ALL_EQ
20 description : >-
21 If set to true, MPICH will initiate an additional thread to
22 make asynchronous progress on all communication operations
23 including point-to-point, collective, one-sided operations and
24 I/O. Setting this variable will automatically increase the
25 thread-safety level to MPI_THREAD_MULTIPLE. While this
26 improves the progress semantics, it might cause a small amount
27 of performance overhead for regular MPI operations. The user
28 is encouraged to leave one or more hardware threads vacant in
29 order to prevent contention between the application threads
30 and the progress thread(s). The impact of oversubscription is
31 highly system dependent but may be substantial in some cases,
32 hence this recommendation.
33
34
35 === END_MPI_T_CVAR_INFO_BLOCK ===
36 */
37
38 #if defined(MPICH_IS_THREADED) && MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
39
40 static int MPIR_async_thread_initialized = 0;
41
42 static MPIR_Comm *progress_comm_ptr;
43 static MPID_Thread_id_t progress_thread_id;
44
45 /* We can use whatever tag we want; we use a different communicator
46 * for communicating with the progress thread. */
47 #define WAKE_TAG 100
48
progress_fn(void * data)49 static void progress_fn(void *data)
50 {
51 int mpi_errno = MPI_SUCCESS;
52 MPIR_Request *request_ptr = NULL;
53 MPI_Request request;
54 MPI_Status status;
55
56 /* Explicitly add CS_ENTER/EXIT since this thread is created from
57 * within an internal function and will call NMPI functions
58 * directly. */
59 MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
60
61 /* FIXME: We assume that waiting on some request forces progress
62 * on all requests. With fine-grained threads, will this still
63 * work as expected? We can imagine an approach where a request on
64 * a non-conflicting communicator would not touch the remaining
65 * requests to avoid locking issues. Once the fine-grained threads
66 * code is fully functional, we need to revisit this and, if
67 * appropriate, either change what we do in this thread, or delete
68 * this comment. */
69
70 mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
71 MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
72 MPIR_Assert(!mpi_errno);
73 request = request_ptr->handle;
74 mpi_errno = MPIR_Wait(&request, &status);
75 MPIR_Assert(!mpi_errno);
76
77 MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
78
79 return;
80 }
81
82 /* called inside MPID_Init_async_thread to provide device override */
MPIR_Init_async_thread(void)83 int MPIR_Init_async_thread(void)
84 {
85 int mpi_errno = MPI_SUCCESS;
86 MPIR_Comm *comm_self_ptr;
87 int err = 0;
88 MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
89
90 MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
91
92
93 /* Dup comm world for the progress thread */
94 MPIR_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
95 mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, NULL, &progress_comm_ptr);
96 MPIR_ERR_CHECK(mpi_errno);
97
98 MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err);
99 MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
100 strerror(err));
101
102 MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
103
104 fn_exit:
105 return mpi_errno;
106 fn_fail:
107 goto fn_exit;
108 }
109
110 /* called inside MPID_Finalize_async_thread to provide device override */
MPIR_Finalize_async_thread(void)111 int MPIR_Finalize_async_thread(void)
112 {
113 int mpi_errno = MPI_SUCCESS;
114 MPIR_Request *request_ptr = NULL;
115 MPI_Request request;
116 MPI_Status status;
117 MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
118
119 MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
120
121 MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
122
123 mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
124 MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
125 MPIR_Assert(!mpi_errno);
126 request = request_ptr->handle;
127 mpi_errno = MPIR_Wait(&request, &status);
128 MPIR_Assert(!mpi_errno);
129
130 MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
131
132 MPID_Thread_join(progress_thread_id);
133
134 mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
135 MPIR_Assert(!mpi_errno);
136
137 MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
138
139 return mpi_errno;
140 }
141
142 /* called inside MPIR_Init_thread */
MPII_init_async(void)143 int MPII_init_async(void)
144 {
145 int mpi_errno = MPI_SUCCESS;
146
147 if (MPIR_CVAR_ASYNC_PROGRESS) {
148 if (MPIR_ThreadInfo.thread_provided == MPI_THREAD_MULTIPLE) {
149 mpi_errno = MPID_Init_async_thread();
150 if (mpi_errno)
151 goto fn_fail;
152
153 MPIR_async_thread_initialized = 1;
154 } else {
155 printf("WARNING: No MPI_THREAD_MULTIPLE support (needed for async progress)\n");
156 }
157 }
158 fn_exit:
159 return mpi_errno;
160 fn_fail:
161 goto fn_exit;
162 }
163
164 /* called inside MPI_Finalize */
MPII_finalize_async(void)165 int MPII_finalize_async(void)
166 {
167 int mpi_errno = MPI_SUCCESS;
168
169 /* If the user requested for asynchronous progress, we need to
170 * shutdown the progress thread */
171 if (MPIR_async_thread_initialized) {
172 mpi_errno = MPID_Finalize_async_thread();
173 }
174
175 return mpi_errno;
176 }
177
178 #else
MPIR_Finalize_async_thread(void)179 int MPIR_Finalize_async_thread(void)
180 {
181 return MPI_SUCCESS;
182 }
183
MPIR_Init_async_thread(void)184 int MPIR_Init_async_thread(void)
185 {
186 return MPI_SUCCESS;
187 }
188
MPII_init_async(void)189 int MPII_init_async(void)
190 {
191 return MPI_SUCCESS;
192 }
193
MPII_finalize_async(void)194 int MPII_finalize_async(void)
195 {
196 return MPI_SUCCESS;
197 }
198 #endif
199