1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #ifndef CH4R_RECVQ_H_INCLUDED
7 #define CH4R_RECVQ_H_INCLUDED
8 
9 #include <mpidimpl.h>
10 #include "mpidig_am.h"
11 #include "utlist.h"
12 #include "ch4_impl.h"
13 
14 extern unsigned PVAR_LEVEL_posted_recvq_length ATTRIBUTE((unused));
15 extern unsigned PVAR_LEVEL_unexpected_recvq_length ATTRIBUTE((unused));
16 extern unsigned long long PVAR_COUNTER_posted_recvq_match_attempts ATTRIBUTE((unused));
17 extern unsigned long long PVAR_COUNTER_unexpected_recvq_match_attempts ATTRIBUTE((unused));
18 extern MPIR_T_pvar_timer_t PVAR_TIMER_time_failed_matching_postedq ATTRIBUTE((unused));
19 extern MPIR_T_pvar_timer_t PVAR_TIMER_time_matching_unexpectedq ATTRIBUTE((unused));
20 
21 int MPIDIG_recvq_init(void);
22 
MPIDIG_match_posted(int rank,int tag,MPIR_Context_id_t context_id,MPIR_Request * req)23 MPL_STATIC_INLINE_PREFIX int MPIDIG_match_posted(int rank, int tag,
24                                                  MPIR_Context_id_t context_id, MPIR_Request * req)
25 {
26     return (rank == MPIDIG_REQUEST(req, rank) || MPIDIG_REQUEST(req, rank) == MPI_ANY_SOURCE) &&
27         (tag == MPIR_TAG_MASK_ERROR_BITS(MPIDIG_REQUEST(req, tag)) ||
28          MPIDIG_REQUEST(req, tag) == MPI_ANY_TAG) && context_id == MPIDIG_REQUEST(req, context_id);
29 }
30 
MPIDIG_match_unexp(int rank,int tag,MPIR_Context_id_t context_id,MPIR_Request * req)31 MPL_STATIC_INLINE_PREFIX int MPIDIG_match_unexp(int rank, int tag,
32                                                 MPIR_Context_id_t context_id, MPIR_Request * req)
33 {
34     return (rank == MPIDIG_REQUEST(req, rank) || rank == MPI_ANY_SOURCE) &&
35         (tag == MPIR_TAG_MASK_ERROR_BITS(MPIDIG_REQUEST(req, tag)) ||
36          tag == MPI_ANY_TAG) && context_id == MPIDIG_REQUEST(req, context_id);
37 }
38 
39 #ifdef MPIDI_CH4U_USE_PER_COMM_QUEUE
40 
MPIDIG_enqueue_posted(MPIR_Request * req,MPIDIG_rreq_t ** list)41 MPL_STATIC_INLINE_PREFIX void MPIDIG_enqueue_posted(MPIR_Request * req, MPIDIG_rreq_t ** list)
42 {
43     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_ENQUEUE_POSTED);
44     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_ENQUEUE_POSTED);
45     MPIDIG_REQUEST(req, req->rreq.request) = req;
46     DL_APPEND(*list, &req->dev.ch4.am.req->rreq);
47     MPIR_T_PVAR_LEVEL_INC(RECVQ, posted_recvq_length, 1);
48     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_ENQUEUE_POSTED);
49 }
50 
MPIDIG_enqueue_unexp(MPIR_Request * req,MPIDIG_rreq_t ** list)51 MPL_STATIC_INLINE_PREFIX void MPIDIG_enqueue_unexp(MPIR_Request * req, MPIDIG_rreq_t ** list)
52 {
53     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_ENQUEUE_UNEXP);
54     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_ENQUEUE_UNEXP);
55     MPIDIG_REQUEST(req, req->rreq.request) = req;
56     DL_APPEND(*list, &req->dev.ch4.am.req->rreq);
57     MPIR_T_PVAR_LEVEL_INC(RECVQ, unexpected_recvq_length, 1);
58     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_ENQUEUE_UNEXP);
59 }
60 
MPIDIG_delete_unexp(MPIR_Request * req,MPIDIG_rreq_t ** list)61 MPL_STATIC_INLINE_PREFIX void MPIDIG_delete_unexp(MPIR_Request * req, MPIDIG_rreq_t ** list)
62 {
63     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DELETE_UNEXP);
64     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DELETE_UNEXP);
65     DL_DELETE(*list, &req->dev.ch4.am.req->rreq);
66     MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
67     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DELETE_UNEXP);
68 }
69 
MPIDIG_dequeue_unexp_strict(int rank,int tag,MPIR_Context_id_t context_id,MPIDIG_rreq_t ** list)70 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_dequeue_unexp_strict(int rank, int tag,
71                                                                    MPIR_Context_id_t context_id,
72                                                                    MPIDIG_rreq_t ** list)
73 {
74     MPIDIG_rreq_t *curr, *tmp;
75     MPIR_Request *req = NULL;
76     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DEQUEUE_UNEXP_STRICT);
77     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DEQUEUE_UNEXP_STRICT);
78 
79     MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
80     DL_FOREACH_SAFE(*list, curr, tmp) {
81         MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
82         req = curr->request;
83         if (!(MPIDIG_REQUEST(req, req->status) & MPIDIG_REQ_BUSY) &&
84             MPIDIG_match_unexp(rank, tag, context_id, req)) {
85             DL_DELETE(*list, curr);
86             MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
87             break;
88         }
89         req = NULL;
90     }
91     MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
92     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DEQUEUE_UNEXP_STRICT);
93     return req;
94 }
95 
MPIDIG_dequeue_unexp(int rank,int tag,MPIR_Context_id_t context_id,MPIDIG_rreq_t ** list)96 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_dequeue_unexp(int rank, int tag,
97                                                             MPIR_Context_id_t context_id,
98                                                             MPIDIG_rreq_t ** list)
99 {
100     MPIDIG_rreq_t *curr, *tmp;
101     MPIR_Request *req = NULL;
102     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DEQUEUE_UNEXP);
103     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DEQUEUE_UNEXP);
104 
105     MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
106     DL_FOREACH_SAFE(*list, curr, tmp) {
107         MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
108         req = curr->request;
109         if (MPIDIG_match_unexp(rank, tag, context_id, req)) {
110             DL_DELETE(*list, curr);
111             MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
112             break;
113         }
114         req = NULL;
115     }
116     MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
117     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DEQUEUE_UNEXP);
118     return req;
119 }
120 
MPIDIG_find_unexp(int rank,int tag,MPIR_Context_id_t context_id,MPIDIG_rreq_t ** list)121 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_find_unexp(int rank, int tag,
122                                                          MPIR_Context_id_t context_id,
123                                                          MPIDIG_rreq_t ** list)
124 {
125     MPIDIG_rreq_t *curr, *tmp;
126     MPIR_Request *req = NULL;
127     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_FIND_UNEXP);
128     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_FIND_UNEXP);
129 
130     MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
131     DL_FOREACH_SAFE(*list, curr, tmp) {
132         MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
133         req = curr->request;
134         if (MPIDIG_match_unexp(rank, tag, context_id, req)) {
135             break;
136         }
137         req = NULL;
138     }
139     MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
140     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_FIND_UNEXP);
141     return req;
142 }
143 
MPIDIG_dequeue_posted(int rank,int tag,MPIR_Context_id_t context_id,int is_local,MPIDIG_rreq_t ** list)144 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_dequeue_posted(int rank, int tag,
145                                                              MPIR_Context_id_t context_id,
146                                                              int is_local, MPIDIG_rreq_t ** list)
147 {
148     MPIR_Request *req = NULL;
149     MPIDIG_rreq_t *curr, *tmp;
150     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DEQUEUE_POSTED);
151     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DEQUEUE_POSTED);
152 
153     MPIR_T_PVAR_TIMER_START(RECVQ, time_failed_matching_postedq);
154     DL_FOREACH_SAFE(*list, curr, tmp) {
155         MPIR_T_PVAR_COUNTER_INC(RECVQ, posted_recvq_match_attempts, 1);
156 #ifndef MPIDI_CH4_DIRECT_NETMOD
157         /* NOTE: extra negation to force logical comparisons */
158         if (!MPIDI_REQUEST(curr->request, is_local) != !is_local) {
159             continue;
160         }
161 #endif
162         if (MPIDIG_match_posted(rank, tag, context_id, curr->request)) {
163             req = curr->request;
164             DL_DELETE(*list, curr);
165             MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
166             break;
167         }
168     }
169     if (!req)
170         MPIR_T_PVAR_TIMER_END(RECVQ, time_failed_matching_postedq);
171 
172     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DEQUEUE_POSTED);
173     return req;
174 }
175 
MPIDIG_delete_posted(MPIDIG_rreq_t * req,MPIDIG_rreq_t ** list)176 MPL_STATIC_INLINE_PREFIX int MPIDIG_delete_posted(MPIDIG_rreq_t * req, MPIDIG_rreq_t ** list)
177 {
178     int found = 0;
179     MPIDIG_rreq_t *curr, *tmp;
180     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DELETE_POSTED);
181     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DELETE_POSTED);
182     MPIR_T_PVAR_TIMER_START(RECVQ, time_failed_matching_postedq);
183     DL_FOREACH_SAFE(*list, curr, tmp) {
184         MPIR_T_PVAR_COUNTER_INC(RECVQ, posted_recvq_match_attempts, 1);
185         if (curr == req) {
186             DL_DELETE(*list, curr);
187             found = 1;
188             MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
189             break;
190         }
191     }
192     if (!found)
193         MPIR_T_PVAR_TIMER_END(RECVQ, time_failed_matching_postedq);
194 
195     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DELETE_POSTED);
196     return found;
197 }
198 
199 #else /* #ifdef MPIDI_CH4U_USE_PER_COMM_QUEUE */
200 
201 /* Use global queue */
202 
MPIDIG_enqueue_posted(MPIR_Request * req,MPIDIG_rreq_t ** list)203 MPL_STATIC_INLINE_PREFIX void MPIDIG_enqueue_posted(MPIR_Request * req, MPIDIG_rreq_t ** list)
204 {
205     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_ENQUEUE_POSTED);
206     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_ENQUEUE_POSTED);
207     MPIDIG_REQUEST(req, req->rreq.request) = req;
208     DL_APPEND(MPIDI_global.posted_list, &req->dev.ch4.am.req->rreq);
209     MPIR_T_PVAR_LEVEL_INC(RECVQ, posted_recvq_length, 1);
210     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_ENQUEUE_POSTED);
211 }
212 
MPIDIG_enqueue_unexp(MPIR_Request * req,MPIDIG_rreq_t ** list)213 MPL_STATIC_INLINE_PREFIX void MPIDIG_enqueue_unexp(MPIR_Request * req, MPIDIG_rreq_t ** list)
214 {
215     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_ENQUEUE_UNEXP);
216     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_ENQUEUE_UNEXP);
217     MPIDIG_REQUEST(req, req->rreq.request) = req;
218     DL_APPEND(MPIDI_global.unexp_list, &req->dev.ch4.am.req->rreq);
219     MPIR_T_PVAR_LEVEL_INC(RECVQ, unexpected_recvq_length, 1);
220     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_ENQUEUE_UNEXP);
221 }
222 
MPIDIG_delete_unexp(MPIR_Request * req,MPIDIG_rreq_t ** list)223 MPL_STATIC_INLINE_PREFIX void MPIDIG_delete_unexp(MPIR_Request * req, MPIDIG_rreq_t ** list)
224 {
225     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DELETE_UNEXP);
226     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DELETE_UNEXP);
227     DL_DELETE(MPIDI_global.unexp_list, &req->dev.ch4.am.req->rreq);
228     MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
229     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DELETE_UNEXP);
230 }
231 
MPIDIG_dequeue_unexp_strict(int rank,int tag,MPIR_Context_id_t context_id,MPIDIG_rreq_t ** list)232 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_dequeue_unexp_strict(int rank, int tag,
233                                                                    MPIR_Context_id_t context_id,
234                                                                    MPIDIG_rreq_t ** list)
235 {
236     MPIDIG_rreq_t *curr, *tmp;
237     MPIR_Request *req = NULL;
238     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DEQUEUE_UNEXP_STRICT);
239     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DEQUEUE_UNEXP_STRICT);
240 
241     MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
242     DL_FOREACH_SAFE(MPIDI_global.unexp_list, curr, tmp) {
243         MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
244         req = (MPIR_Request *) curr->request;
245         if (!(MPIDIG_REQUEST(req, req->status) & MPIDIG_REQ_BUSY) &&
246             MPIDIG_match_unexp(rank, tag, context_id, req)) {
247             DL_DELETE(MPIDI_global.unexp_list, curr);
248             MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
249             break;
250         }
251         req = NULL;
252     }
253     MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
254     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DEQUEUE_UNEXP_STRICT);
255     return req;
256 }
257 
MPIDIG_dequeue_unexp(int rank,int tag,MPIR_Context_id_t context_id,MPIDIG_rreq_t ** list)258 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_dequeue_unexp(int rank, int tag,
259                                                             MPIR_Context_id_t context_id,
260                                                             MPIDIG_rreq_t ** list)
261 {
262     MPIDIG_rreq_t *curr, *tmp;
263     MPIR_Request *req = NULL;
264     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DEQUEUE_UNEXP);
265     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DEQUEUE_UNEXP);
266 
267     MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
268     DL_FOREACH_SAFE(MPIDI_global.unexp_list, curr, tmp) {
269         MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
270         req = (MPIR_Request *) curr->request;
271         if (MPIDIG_match_unexp(rank, tag, context_id, req)) {
272             DL_DELETE(MPIDI_global.unexp_list, curr);
273             MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
274             break;
275         }
276         req = NULL;
277     }
278     MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
279     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DEQUEUE_UNEXP);
280     return req;
281 }
282 
MPIDIG_find_unexp(int rank,int tag,MPIR_Context_id_t context_id,MPIDIG_rreq_t ** list)283 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_find_unexp(int rank, int tag,
284                                                          MPIR_Context_id_t context_id,
285                                                          MPIDIG_rreq_t ** list)
286 {
287     MPIDIG_rreq_t *curr, *tmp;
288     MPIR_Request *req = NULL;
289     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_FIND_UNEXP);
290     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_FIND_UNEXP);
291 
292     MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
293     DL_FOREACH_SAFE(MPIDI_global.unexp_list, curr, tmp) {
294         MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
295         req = (MPIR_Request *) curr->request;
296         if (MPIDIG_match_unexp(rank, tag, context_id, req)) {
297             break;
298         }
299         req = NULL;
300     }
301     MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
302     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_FIND_UNEXP);
303     return req;
304 }
305 
MPIDIG_dequeue_posted(int rank,int tag,MPIR_Context_id_t context_id,int is_local,MPIDIG_rreq_t ** list)306 MPL_STATIC_INLINE_PREFIX MPIR_Request *MPIDIG_dequeue_posted(int rank, int tag,
307                                                              MPIR_Context_id_t context_id,
308                                                              int is_local, MPIDIG_rreq_t ** list)
309 {
310     MPIR_Request *req = NULL;
311     MPIDIG_rreq_t *curr, *tmp;
312     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DEQUEUE_POSTED);
313     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DEQUEUE_POSTED);
314 
315     MPIR_T_PVAR_TIMER_START(RECVQ, time_failed_matching_postedq);
316     DL_FOREACH_SAFE(MPIDI_global.posted_list, curr, tmp) {
317         MPIR_T_PVAR_COUNTER_INC(RECVQ, posted_recvq_match_attempts, 1);
318         req = (MPIR_Request *) curr->request;
319         if (MPIDIG_match_posted(rank, tag, context_id, req)) {
320             DL_DELETE(MPIDI_global.posted_list, curr);
321             MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
322             break;
323         }
324         req = NULL;
325     }
326     if (!req)
327         MPIR_T_PVAR_TIMER_END(RECVQ, time_failed_matching_postedq);
328 
329     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DEQUEUE_POSTED);
330     return req;
331 }
332 
MPIDIG_delete_posted(MPIDIG_rreq_t * req,MPIDIG_rreq_t ** list)333 MPL_STATIC_INLINE_PREFIX int MPIDIG_delete_posted(MPIDIG_rreq_t * req, MPIDIG_rreq_t ** list)
334 {
335     int found = 0;
336     MPIDIG_rreq_t *curr, *tmp;
337     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_DELETE_POSTED);
338     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_DELETE_POSTED);
339     MPIR_T_PVAR_TIMER_START(RECVQ, time_failed_matching_postedq);
340     DL_FOREACH_SAFE(MPIDI_global.posted_list, curr, tmp) {
341         MPIR_T_PVAR_COUNTER_INC(RECVQ, posted_recvq_match_attempts, 1);
342         if (curr == req) {
343             DL_DELETE(MPIDI_global.posted_list, curr);
344             found = 1;
345             MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
346             break;
347         }
348     }
349     if (!found)
350         MPIR_T_PVAR_TIMER_END(RECVQ, time_failed_matching_postedq);
351 
352     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_DELETE_POSTED);
353     return found;
354 }
355 
356 #endif /* MPIDI_CH4U_USE_PER_COMM_QUEUE */
357 
358 #endif /* CH4R_RECVQ_H_INCLUDED */
359