1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #include "mpiimpl.h"
7 #include "mpi_init.h"
8 
9 /*
10 === BEGIN_MPI_T_CVAR_INFO_BLOCK ===
11 
12 cvars:
13     - name        : MPIR_CVAR_ASYNC_PROGRESS
14       category    : THREADS
15       type        : boolean
16       default     : false
17       class       : none
18       verbosity   : MPI_T_VERBOSITY_USER_BASIC
19       scope       : MPI_T_SCOPE_ALL_EQ
20       description : >-
21         If set to true, MPICH will initiate an additional thread to
22         make asynchronous progress on all communication operations
23         including point-to-point, collective, one-sided operations and
24         I/O.  Setting this variable will automatically increase the
25         thread-safety level to MPI_THREAD_MULTIPLE.  While this
26         improves the progress semantics, it might cause a small amount
27         of performance overhead for regular MPI operations.  The user
28         is encouraged to leave one or more hardware threads vacant in
29         order to prevent contention between the application threads
30         and the progress thread(s).  The impact of oversubscription is
31         highly system dependent but may be substantial in some cases,
32         hence this recommendation.
33 
34 
35 === END_MPI_T_CVAR_INFO_BLOCK ===
36 */
37 
38 #if defined(MPICH_IS_THREADED) && MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
39 
40 static int MPIR_async_thread_initialized = 0;
41 
42 static MPIR_Comm *progress_comm_ptr;
43 static MPID_Thread_id_t progress_thread_id;
44 
45 /* We can use whatever tag we want; we use a different communicator
46  * for communicating with the progress thread. */
47 #define WAKE_TAG 100
48 
progress_fn(void * data)49 static void progress_fn(void *data)
50 {
51     int mpi_errno = MPI_SUCCESS;
52     MPIR_Request *request_ptr = NULL;
53     MPI_Request request;
54     MPI_Status status;
55 
56     /* Explicitly add CS_ENTER/EXIT since this thread is created from
57      * within an internal function and will call NMPI functions
58      * directly. */
59     MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
60 
61     /* FIXME: We assume that waiting on some request forces progress
62      * on all requests. With fine-grained threads, will this still
63      * work as expected? We can imagine an approach where a request on
64      * a non-conflicting communicator would not touch the remaining
65      * requests to avoid locking issues. Once the fine-grained threads
66      * code is fully functional, we need to revisit this and, if
67      * appropriate, either change what we do in this thread, or delete
68      * this comment. */
69 
70     mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
71                            MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
72     MPIR_Assert(!mpi_errno);
73     request = request_ptr->handle;
74     mpi_errno = MPIR_Wait(&request, &status);
75     MPIR_Assert(!mpi_errno);
76 
77     MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
78 
79     return;
80 }
81 
82 /* called inside MPID_Init_async_thread to provide device override */
MPIR_Init_async_thread(void)83 int MPIR_Init_async_thread(void)
84 {
85     int mpi_errno = MPI_SUCCESS;
86     MPIR_Comm *comm_self_ptr;
87     int err = 0;
88     MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
89 
90     MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
91 
92 
93     /* Dup comm world for the progress thread */
94     MPIR_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
95     mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, NULL, &progress_comm_ptr);
96     MPIR_ERR_CHECK(mpi_errno);
97 
98     MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err);
99     MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
100                          strerror(err));
101 
102     MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
103 
104   fn_exit:
105     return mpi_errno;
106   fn_fail:
107     goto fn_exit;
108 }
109 
110 /* called inside MPID_Finalize_async_thread to provide device override */
MPIR_Finalize_async_thread(void)111 int MPIR_Finalize_async_thread(void)
112 {
113     int mpi_errno = MPI_SUCCESS;
114     MPIR_Request *request_ptr = NULL;
115     MPI_Request request;
116     MPI_Status status;
117     MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
118 
119     MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
120 
121     MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
122 
123     mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
124                            MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
125     MPIR_Assert(!mpi_errno);
126     request = request_ptr->handle;
127     mpi_errno = MPIR_Wait(&request, &status);
128     MPIR_Assert(!mpi_errno);
129 
130     MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
131 
132     MPID_Thread_join(progress_thread_id);
133 
134     mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
135     MPIR_Assert(!mpi_errno);
136 
137     MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
138 
139     return mpi_errno;
140 }
141 
142 /* called inside MPIR_Init_thread */
MPII_init_async(void)143 int MPII_init_async(void)
144 {
145     int mpi_errno = MPI_SUCCESS;
146 
147     if (MPIR_CVAR_ASYNC_PROGRESS) {
148         if (MPIR_ThreadInfo.thread_provided == MPI_THREAD_MULTIPLE) {
149             mpi_errno = MPID_Init_async_thread();
150             if (mpi_errno)
151                 goto fn_fail;
152 
153             MPIR_async_thread_initialized = 1;
154         } else {
155             printf("WARNING: No MPI_THREAD_MULTIPLE support (needed for async progress)\n");
156         }
157     }
158   fn_exit:
159     return mpi_errno;
160   fn_fail:
161     goto fn_exit;
162 }
163 
164 /* called inside MPI_Finalize */
MPII_finalize_async(void)165 int MPII_finalize_async(void)
166 {
167     int mpi_errno = MPI_SUCCESS;
168 
169     /* If the user requested for asynchronous progress, we need to
170      * shutdown the progress thread */
171     if (MPIR_async_thread_initialized) {
172         mpi_errno = MPID_Finalize_async_thread();
173     }
174 
175     return mpi_errno;
176 }
177 
178 #else
MPIR_Finalize_async_thread(void)179 int MPIR_Finalize_async_thread(void)
180 {
181     return MPI_SUCCESS;
182 }
183 
MPIR_Init_async_thread(void)184 int MPIR_Init_async_thread(void)
185 {
186     return MPI_SUCCESS;
187 }
188 
MPII_init_async(void)189 int MPII_init_async(void)
190 {
191     return MPI_SUCCESS;
192 }
193 
MPII_finalize_async(void)194 int MPII_finalize_async(void)
195 {
196     return MPI_SUCCESS;
197 }
198 #endif
199