1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3  *  (C) 2001 by Argonne National Laboratory.
4  *      See COPYRIGHT in top-level directory.
5  */
6 
7 #include "mpidimpl.h"
8 
9 /* MPIDI_POSTED_RECV_ENQUEUE_HOOK(req): Notifies channel that req has
10    been enqueued on the posted recv queue.  Returns void. */
11 #ifndef MPIDI_POSTED_RECV_ENQUEUE_HOOK
12 #define MPIDI_POSTED_RECV_ENQUEUE_HOOK(req) do{}while(0)
13 #endif
14 /* MPIDI_POSTED_RECV_DEQUEUE_HOOK(req): Notifies channel that req has
15    been dequeued from the posted recv queue.  Returns non-zero if the
16    channel has already matched the request; 0 otherwise.  This happens
17    when the channel supports shared-memory and network communication
18    with a network capable of matching, and the same request is matched
19    by the network and, e.g., shared-memory.  When that happens the
20    dequeue functions below should, either search for the next matching
21    request, or report that no request was found. */
22 #ifndef MPIDI_POSTED_RECV_DEQUEUE_HOOK
23 #define MPIDI_POSTED_RECV_DEQUEUE_HOOK(req) 0
24 #endif
25 
26 /* FIXME:
27  * Recvq_lock/unlock removed because it is not needed for the SINGLE_CS
28  * approach and we might want a different, non-lock-based approach in
29  * a finer-grained thread-sync version.  For example,
30  * some routines can be implemented in a lock-free
31  * fashion (because the user is required to guarantee non-conflicting
32  * accesses, such as doing a probe and a receive that matches in different
33  * threads).
34  *
35  * There are a lot of routines here.  Do we really need them all?
36  *
37  * The search criteria can be implemented in a single 64 bit compare on
38  * systems with efficient 64-bit operations.  The rank and contextid can also
39  * in many cases be combined into a single 32-bit word for the comparison
40  * (in which case the message info should be stored in the queue in a
41  * naturally aligned, 64-bit word.
42  *
43  */
44 
45 static MPID_Request * recvq_posted_head = 0;
46 static MPID_Request * recvq_posted_tail = 0;
47 static MPID_Request * recvq_unexpected_head = 0;
48 static MPID_Request * recvq_unexpected_tail = 0;
49 
50 /* Export the location of the queue heads if debugger support is enabled.
51  * This allows the queue code to rely on the local variables for the
52  * queue heads while also exporting those variables to the debugger.
53  * See src/mpi/debugger/dll_mpich2.c for how this is used to
54  * access the message queues.
55  */
56 #ifdef HAVE_DEBUGGER_SUPPORT
57 MPID_Request ** const MPID_Recvq_posted_head_ptr     = &recvq_posted_head;
58 MPID_Request ** const MPID_Recvq_unexpected_head_ptr = &recvq_unexpected_head;
59 #endif
60 
61 /* TODO decide control this independently via configure or with the existing
62  * --enable-timing option (#ifdef COLLECT_STATS) */
63 #define ENABLE_RECVQ_STATISTICS 1
64 #ifdef ENABLE_RECVQ_STATISTICS
65 static unsigned int posted_qlen = 0;
66 static unsigned int unexpected_qlen = 0;
67 static MPI_Aint posted_recvq_match_attempts = 0;
68 static MPI_Aint unexpected_recvq_match_attempts = 0;
69 /* TODO add some code here and probably elsewhere to make these show up in the
70  * MPIX_T_pvar_ interface */
71 #define MPIR_T_INC(x) (++(x))
72 #define MPIR_T_DEC(x) (--(x))
73 
74 #else
75 
76 #define MPIR_T_INC(x)
77 #define MPIR_T_DEC(x)
78 
79 #endif /* defined(ENABLE_RECVQ_STATISTICS) */
80 
81 /* If the MPIDI_Message_match structure fits into a pointer size, we
82  * can directly work on it */
83 /* MATCH_WITH_NO_MASK compares the match values without masking
84  * them. This is useful for the case where there are no ANY_TAG or
85  * ANY_SOURCE wild cards. */
86 #define MATCH_WITH_NO_MASK(match1, match2)                              \
87     ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ? ((match1).whole == (match2).whole) : \
88      (((match1).parts.rank == (match2).parts.rank) &&                   \
89       ((match1).parts.tag == (match2).parts.tag) &&                     \
90       ((match1).parts.context_id == (match2).parts.context_id)))
91 
92 /* MATCH_WITH_LEFT_MASK compares the match values after masking only
93  * the left field. This is useful for the case where the right match
94  * is a part of the unexpected queue and has no ANY_TAG or ANY_SOURCE
95  * wild cards, but the left match might have them. */
96 #define MATCH_WITH_LEFT_MASK(match1, match2, mask)                      \
97     ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ?                   \
98      (((match1).whole & (mask).whole) == (match2).whole) :              \
99      ((((match1).parts.rank & (mask).parts.rank) == (match2).parts.rank) && \
100       (((match1).parts.tag & (mask).parts.tag) == (match2).parts.tag) && \
101       ((match1).parts.context_id == (match2).parts.context_id)))
102 
103 /* This is the most general case where both matches have to be
104  * masked. Both matches are masked with the same value. There doesn't
105  * seem to be a need for two different masks at this time. */
106 #define MATCH_WITH_LEFT_RIGHT_MASK(match1, match2, mask)                \
107     ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ?                   \
108      (((match1).whole & (mask).whole) == ((match2).whole & (mask).whole)) : \
109      ((((match1).parts.rank & (mask).parts.rank) == ((match2).parts.rank & (mask).parts.rank)) && \
110       (((match1).parts.tag & (mask).parts.tag) == ((match2).parts.tag & (mask).parts.tag)) && \
111       ((match1).parts.context_id == (match2).parts.context_id)))
112 
113 /* will be invoked to populate the custom parts of pvar_handle objects */
simple_uint_creator(void * obj_handle,struct MPIR_T_pvar_handle * handle,int * countp)114 static int simple_uint_creator(void *obj_handle,
115                                struct MPIR_T_pvar_handle *handle,
116                                int *countp)
117 {
118     /* the IMPL_SIMPLE code reads/writes "bytes" bytes from the location given
119      * by the "handle_state" pointer */
120     handle->handle_state = handle->info->var_state;
121     handle->bytes = sizeof(unsigned int);
122 
123     /* a single unsigned int should be read/written */
124     *countp = 1;
125     return MPI_SUCCESS;
126 }
127 
simple_aint_creator(void * obj_handle,struct MPIR_T_pvar_handle * handle,int * countp)128 static int simple_aint_creator(void *obj_handle,
129                                struct MPIR_T_pvar_handle *handle,
130                                int *countp)
131 {
132     /* the IMPL_SIMPLE code reads/writes "bytes" bytes from the location given
133      * by the "handle_state" pointer */
134     handle->handle_state = handle->info->var_state;
135     handle->bytes = sizeof(MPI_Aint);
136 
137     /* a single Aint should be read/written */
138     *countp = 1;
139     return MPI_SUCCESS;
140 }
141 
142 #undef FUNCNAME
143 #define FUNCNAME MPIDI_CH3U_Recvq_FU
144 #undef FCNAME
145 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_init(void)146 int MPIDI_CH3U_Recvq_init(void)
147 {
148     int mpi_errno = MPI_SUCCESS;
149 #ifdef ENABLE_RECVQ_STATISTICS
150     int idx = -1;
151     mpi_errno = MPIR_T_pvar_add("posted_recvq_length",
152                                 MPIX_T_VERBOSITY_USER_DETAIL,
153                                 MPIX_T_PVAR_CLASS_LEVEL,
154                                 MPI_UNSIGNED,
155                                 MPIX_T_ENUM_NULL,
156                                 "length of the posted message receive queue",
157                                 MPIX_T_BIND_NO_OBJECT,
158                                 /*readonly=*/TRUE,
159                                 /*continuous=*/TRUE,
160                                 /*atomic=*/FALSE,
161                                 MPIR_T_PVAR_IMPL_SIMPLE,
162                                 /*var_state=*/&posted_qlen,
163                                 &simple_uint_creator,
164                                 &idx);
165     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
166 
167     mpi_errno = MPIR_T_pvar_add("unexpected_recvq_length",
168                                 MPIX_T_VERBOSITY_USER_DETAIL,
169                                 MPIX_T_PVAR_CLASS_LEVEL,
170                                 MPI_UNSIGNED,
171                                 MPIX_T_ENUM_NULL,
172                                 "length of the unexpected messsage receive queue",
173                                 MPIX_T_BIND_NO_OBJECT,
174                                 /*readonly=*/TRUE,
175                                 /*continuous=*/TRUE,
176                                 /*atomic=*/FALSE,
177                                 MPIR_T_PVAR_IMPL_SIMPLE,
178                                 /*var_state=*/&unexpected_qlen,
179                                 &simple_uint_creator,
180                                 &idx);
181     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
182 
183     /* posted receive queue failed matches */
184     mpi_errno = MPIR_T_pvar_add("posted_recvq_match_attempts",
185                                 MPIX_T_VERBOSITY_USER_DETAIL,
186                                 MPIX_T_PVAR_CLASS_COUNTER,
187                                 MPI_AINT,
188                                 MPIX_T_ENUM_NULL,
189                                 "number of search passes on the messsage receive queue",
190                                 MPIX_T_BIND_NO_OBJECT,
191                                 /*readonly=*/FALSE,
192                                 /*continuous=*/TRUE,
193                                 /*atomic=*/FALSE,
194                                 MPIR_T_PVAR_IMPL_SIMPLE,
195                                 /*var_state=*/&posted_recvq_match_attempts,
196                                 &simple_aint_creator,
197                                 &idx);
198     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
199 
200     /* unexpected receive queue failed matches */
201     mpi_errno = MPIR_T_pvar_add("unexpected_recvq_match_attempts",
202                                 MPIX_T_VERBOSITY_USER_DETAIL,
203                                 MPIX_T_PVAR_CLASS_COUNTER,
204                                 MPI_AINT,
205                                 MPIX_T_ENUM_NULL,
206                                 "number of search passes on the messsage receive queue",
207                                 MPIX_T_BIND_NO_OBJECT,
208                                 /*readonly=*/FALSE,
209                                 /*continuous=*/TRUE,
210                                 /*atomic=*/FALSE,
211                                 MPIR_T_PVAR_IMPL_SIMPLE,
212                                 /*var_state=*/&unexpected_recvq_match_attempts,
213                                 &simple_aint_creator,
214                                 &idx);
215     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
216 #endif
217 fn_fail:
218     return mpi_errno;
219 }
220 
221 /* FIXME: If this routine is only used by probe/iprobe, then we don't need
222    to set the cancelled field in status (only set for nonblocking requests) */
223 /*
224  * MPIDI_CH3U_Recvq_FU()
225  *
226  * Search for a matching request in the unexpected receive queue.  Return
227  * true if one is found, false otherwise.  If the status arguement is
228  * not MPI_STATUS_IGNORE, return information about the request in that
229  * parameter.  This routine is used by mpid_probe and mpid_iprobe.
230  *
231  * Multithread - As this is a read-only routine, it need not
232  * require an external critical section (careful organization of the
233  * queue updates would not even require a critical section within this
234  * routine).  However, this routine is used both from within the progress
235  * engine and from without it.  To make that work with the current
236  * design for MSGQUEUE and the brief-global mode, the critical section
237  * is *outside* of this routine.
238  *
239  * This routine is used only in mpid_iprobe and mpid_probe
240  *
241  */
242 #undef FUNCNAME
243 #define FUNCNAME MPIDI_CH3U_Recvq_FU
244 #undef FCNAME
245 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FU(int source,int tag,int context_id,MPI_Status * s)246 int MPIDI_CH3U_Recvq_FU(int source, int tag, int context_id, MPI_Status *s)
247 {
248     MPID_Request * rreq;
249     int found = 0;
250     MPIDI_Message_match match, mask;
251     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
252 
253     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
254 
255     MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
256 
257     rreq = recvq_unexpected_head;
258 
259     match.parts.context_id = context_id;
260     match.parts.tag = tag;
261     match.parts.rank = source;
262 
263     if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
264 	while (rreq != NULL) {
265             MPIR_T_INC(unexpected_recvq_match_attempts);
266 	    if (MATCH_WITH_NO_MASK(rreq->dev.match, match))
267 		break;
268 	    rreq = rreq->dev.next;
269 	}
270     }
271     else {
272 	mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
273 	if (tag == MPI_ANY_TAG)
274 	    match.parts.tag = mask.parts.tag = 0;
275 	if (source == MPI_ANY_SOURCE)
276 	    match.parts.rank = mask.parts.rank = 0;
277 
278 	while (rreq != NULL) {
279             MPIR_T_INC(unexpected_recvq_match_attempts);
280 	    if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask))
281 		break;
282 	    rreq = rreq->dev.next;
283 	}
284     }
285 
286     /* Save the information about the request before releasing the
287        queue */
288     if (rreq) {
289 	if (s != MPI_STATUS_IGNORE) {
290 	    /* Avoid setting "extra" fields like MPI_ERROR */
291 	    s->MPI_SOURCE = rreq->status.MPI_SOURCE;
292 	    s->MPI_TAG    = rreq->status.MPI_TAG;
293 	    s->count      = rreq->status.count;
294 	    s->cancelled  = rreq->status.cancelled;
295 	}
296 	found = 1;
297     }
298 
299     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
300     return found;
301 }
302 
303 /*
304  * MPIDI_CH3U_Recvq_FDU()
305  *
306  * Find a request in the unexpected queue and dequeue it; otherwise return NULL.
307  *
308  * Multithread - This routine must be atomic (since it dequeues a
309  * request).  However, once the request is dequeued, no other thread can
310  * see it, so this routine provides its own atomicity.
311  *
312  * This routine is used only in the case of send_cancel.  However, it is used both
313  * within mpid_send_cancel and within a packet handler.
314  */
315 #undef FUNCNAME
316 #define FUNCNAME MPIDI_CH3U_Recvq_FDU
317 #undef FCNAME
318 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id,MPIDI_Message_match * match)319 MPID_Request * MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id,
320 				    MPIDI_Message_match * match)
321 {
322     MPID_Request * rreq;
323     MPID_Request * prev_rreq;
324     MPID_Request * cur_rreq;
325     MPID_Request * matching_prev_rreq;
326     MPID_Request * matching_cur_rreq;
327     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
328 
329     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
330 
331     MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
332 
333     matching_prev_rreq = NULL;
334     matching_cur_rreq = NULL;
335     prev_rreq = NULL;
336 
337     /* Note that since this routine is used only in the case of send_cancel,
338        there can be only one match if at all. */
339     /* FIXME: Why doesn't this exit after it finds the first match? */
340     cur_rreq = recvq_unexpected_head;
341     while (cur_rreq != NULL) {
342 	if (cur_rreq->dev.sender_req_id == sreq_id &&
343             MPIR_T_INC(unexpected_recvq_match_attempts) &&
344 	    (MATCH_WITH_NO_MASK(cur_rreq->dev.match, *match))) {
345 	    matching_prev_rreq = prev_rreq;
346 	    matching_cur_rreq = cur_rreq;
347 	}
348 	prev_rreq = cur_rreq;
349 	cur_rreq = cur_rreq->dev.next;
350     }
351 
352     if (matching_cur_rreq != NULL) {
353 	if (matching_prev_rreq != NULL) {
354 	    matching_prev_rreq->dev.next = matching_cur_rreq->dev.next;
355 	}
356 	else {
357 	    recvq_unexpected_head = matching_cur_rreq->dev.next;
358 	}
359 
360 	if (matching_cur_rreq->dev.next == NULL) {
361 	    recvq_unexpected_tail = matching_prev_rreq;
362 	}
363 
364         MPIR_T_DEC(unexpected_qlen);
365 	rreq = matching_cur_rreq;
366     }
367     else {
368 	rreq = NULL;
369     }
370 
371     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
372     return rreq;
373 }
374 
375 /* TODO rename the old FDU and use that name for this one */
376 /* This is the routine that you expect to be named "_FDU".  It implements the
377  * behavior needed for improbe; specifically, searching the receive queue for
378  * the first matching request and dequeueing it. */
379 #undef FUNCNAME
380 #define FUNCNAME MPIDI_CH3U_Recvq_FDU_matchonly
381 #undef FCNAME
382 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDU_matchonly(int source,int tag,int context_id,MPID_Comm * comm,int * foundp)383 MPID_Request * MPIDI_CH3U_Recvq_FDU_matchonly(int source, int tag, int context_id, MPID_Comm *comm, int *foundp)
384 {
385     int found = FALSE;
386     MPID_Request *rreq, *prev_rreq;
387     MPIDI_Message_match match;
388     MPIDI_Message_match mask;
389     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
390 
391     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
392 
393     MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
394 
395     /* Optimize this loop for an empty unexpected receive queue */
396     rreq = recvq_unexpected_head;
397     if (rreq) {
398         prev_rreq = NULL;
399 
400         match.parts.context_id = context_id;
401         match.parts.tag = tag;
402         match.parts.rank = source;
403 
404         if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
405             do {
406                 MPIR_T_INC(unexpected_recvq_match_attempts);
407                 if (MATCH_WITH_NO_MASK(rreq->dev.match, match)) {
408                     if (prev_rreq != NULL) {
409                         prev_rreq->dev.next = rreq->dev.next;
410                     }
411                     else {
412                         recvq_unexpected_head = rreq->dev.next;
413                     }
414 
415                     if (rreq->dev.next == NULL) {
416                         recvq_unexpected_tail = prev_rreq;
417                     }
418                     MPIR_T_DEC(unexpected_qlen);
419 
420                     rreq->comm = comm;
421                     MPIR_Comm_add_ref(comm);
422                     /* don't have the (buf,count,type) info right now, can't add
423                      * it to the request */
424                     found = TRUE;
425                     goto lock_exit;
426                 }
427                 prev_rreq = rreq;
428                 rreq      = rreq->dev.next;
429             } while (rreq);
430         }
431         else {
432             mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
433             if (tag == MPI_ANY_TAG)
434                 match.parts.tag = mask.parts.tag = 0;
435             if (source == MPI_ANY_SOURCE)
436                 match.parts.rank = mask.parts.rank = 0;
437 
438             do {
439                 MPIR_T_INC(unexpected_recvq_match_attempts);
440                 if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
441                     if (prev_rreq != NULL) {
442                         prev_rreq->dev.next = rreq->dev.next;
443                     }
444                     else {
445                         recvq_unexpected_head = rreq->dev.next;
446                     }
447                     if (rreq->dev.next == NULL) {
448                         recvq_unexpected_tail = prev_rreq;
449                     }
450                     MPIR_T_DEC(unexpected_qlen);
451 
452                     rreq->comm                 = comm;
453                     MPIR_Comm_add_ref(comm);
454                     /* don't have the (buf,count,type) info right now, can't add
455                      * it to the request */
456                     found = TRUE;
457                     goto lock_exit;
458                 }
459                 prev_rreq = rreq;
460                 rreq = rreq->dev.next;
461             } while (rreq);
462         }
463     }
464 
465 lock_exit:
466     *foundp = found;
467 
468     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
469     return rreq;
470 }
471 
472 /*
473  * MPIDI_CH3U_Recvq_FDU_or_AEP()
474  *
475  * Atomically find a request in the unexpected queue and dequeue it, or
476  * allocate a new request and enqueue it in the posted queue
477  *
478  * Multithread - This routine must be called from within a MSGQUEUE
479  * critical section.  If a request is allocated, it must not release
480  * the MSGQUEUE until the request is completely valid, as another thread
481  * may then find it and dequeue it.
482  *
483  * This routine is used in mpid_irecv and mpid_recv.
484  *
485  */
486 #undef FUNCNAME
487 #define FUNCNAME MPIDI_CH3U_Recvq_FDU_or_AEP
488 #undef FCNAME
489 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDU_or_AEP(int source,int tag,int context_id,MPID_Comm * comm,void * user_buf,int user_count,MPI_Datatype datatype,int * foundp)490 MPID_Request * MPIDI_CH3U_Recvq_FDU_or_AEP(int source, int tag,
491                                            int context_id, MPID_Comm *comm, void *user_buf,
492                                            int user_count, MPI_Datatype datatype, int * foundp)
493 {
494     int found;
495     MPID_Request *rreq, *prev_rreq;
496     MPIDI_Message_match match;
497     MPIDI_Message_match mask;
498     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
499 
500     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
501 
502     MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
503 
504     /* Optimize this loop for an empty unexpected receive queue */
505     rreq = recvq_unexpected_head;
506     if (rreq) {
507 	prev_rreq = NULL;
508 
509 	match.parts.context_id = context_id;
510 	match.parts.tag = tag;
511 	match.parts.rank = source;
512 
513 	if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
514 	    do {
515                 MPIR_T_INC(unexpected_recvq_match_attempts);
516 		if (MATCH_WITH_NO_MASK(rreq->dev.match, match)) {
517 		    if (prev_rreq != NULL) {
518 			prev_rreq->dev.next = rreq->dev.next;
519 		    }
520 		    else {
521 			recvq_unexpected_head = rreq->dev.next;
522 		    }
523 
524 		    if (rreq->dev.next == NULL) {
525 			recvq_unexpected_tail = prev_rreq;
526 		    }
527                     MPIR_T_DEC(unexpected_qlen);
528 
529 		    rreq->comm = comm;
530 		    MPIR_Comm_add_ref(comm);
531 		    rreq->dev.user_buf = user_buf;
532 		    rreq->dev.user_count = user_count;
533 		    rreq->dev.datatype = datatype;
534 		    found = TRUE;
535 		    goto lock_exit;
536 		}
537 		prev_rreq = rreq;
538 		rreq      = rreq->dev.next;
539 	    } while (rreq);
540 	}
541 	else {
542 	    mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
543 	    if (tag == MPI_ANY_TAG)
544 		match.parts.tag = mask.parts.tag = 0;
545 	    if (source == MPI_ANY_SOURCE)
546 		match.parts.rank = mask.parts.rank = 0;
547 
548 	    do {
549                 MPIR_T_INC(unexpected_recvq_match_attempts);
550 		if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
551 		    if (prev_rreq != NULL) {
552 			prev_rreq->dev.next = rreq->dev.next;
553 		    }
554 		    else {
555 			recvq_unexpected_head = rreq->dev.next;
556 		    }
557 		    if (rreq->dev.next == NULL) {
558 			recvq_unexpected_tail = prev_rreq;
559 		    }
560                     MPIR_T_DEC(unexpected_qlen);
561 
562 		    rreq->comm                 = comm;
563 		    MPIR_Comm_add_ref(comm);
564 		    rreq->dev.user_buf         = user_buf;
565 		    rreq->dev.user_count       = user_count;
566 		    rreq->dev.datatype         = datatype;
567 		    found = TRUE;
568 		    goto lock_exit;
569 		}
570 		prev_rreq = rreq;
571 		rreq = rreq->dev.next;
572 	    } while (rreq);
573 	}
574     }
575 
576     /* A matching request was not found in the unexpected queue, so we
577        need to allocate a new request and add it to the posted queue */
578     {
579 	int mpi_errno = MPI_SUCCESS;
580 
581         found = FALSE;
582 
583 	MPIDI_Request_create_rreq( rreq, mpi_errno, goto lock_exit );
584 	rreq->dev.match.parts.tag	   = tag;
585 	rreq->dev.match.parts.rank	   = source;
586 	rreq->dev.match.parts.context_id   = context_id;
587 
588 	/* Added a mask for faster search on 64-bit capable
589 	 * platforms */
590 	rreq->dev.mask.parts.context_id = ~0;
591 	if (rreq->dev.match.parts.rank == MPI_ANY_SOURCE)
592 	    rreq->dev.mask.parts.rank = 0;
593 	else
594 	    rreq->dev.mask.parts.rank = ~0;
595 	if (rreq->dev.match.parts.tag == MPI_ANY_TAG)
596 	    rreq->dev.mask.parts.tag = 0;
597 	else
598 	    rreq->dev.mask.parts.tag = ~0;
599 
600         rreq->comm                 = comm;
601         MPIR_Comm_add_ref(comm);
602         rreq->dev.user_buf         = user_buf;
603         rreq->dev.user_count       = user_count;
604         rreq->dev.datatype         = datatype;
605 
606         /* check whether VC has failed, or this is an ANY_SOURCE in a
607            failed communicator */
608         if (source != MPI_ANY_SOURCE) {
609             MPIDI_VC_t *vc;
610             MPIDI_Comm_get_vc(comm, source, &vc);
611             if (vc->state == MPIDI_VC_STATE_MORIBUND) {
612                 MPIU_ERR_SET1(mpi_errno, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", vc->pg_rank);
613                 rreq->status.MPI_ERROR = mpi_errno;
614                 MPIDI_CH3U_Request_complete(rreq);
615                 goto lock_exit;
616             }
617         } else if (!MPIDI_CH3I_Comm_AS_enabled(comm)) {
618             MPIU_ERR_SET(mpi_errno, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail");
619             rreq->status.MPI_ERROR = mpi_errno;
620             MPIDI_CH3U_Request_complete(rreq);
621             goto lock_exit;
622         }
623 
624 	rreq->dev.next = NULL;
625 	if (recvq_posted_tail != NULL) {
626 	    recvq_posted_tail->dev.next = rreq;
627 	}
628 	else {
629 	    recvq_posted_head = rreq;
630 	}
631 	recvq_posted_tail = rreq;
632         MPIR_T_INC(posted_qlen);
633 	MPIDI_POSTED_RECV_ENQUEUE_HOOK(rreq);
634     }
635 
636   lock_exit:
637 
638     *foundp = found;
639 
640     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
641     return rreq;
642 }
643 
644 
645 /*
646  * MPIDI_CH3U_Recvq_DP()
647  *
648  * Given an existing request, dequeue that request from the posted queue, or
649  * return NULL if the request was not in the posted queued
650  *
651  * Multithread - This routine is atomic
652  */
653 #undef FUNCNAME
654 #define FUNCNAME MPIDI_CH3U_Recvq_DP
655 #undef FCNAME
656 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_DP(MPID_Request * rreq)657 int MPIDI_CH3U_Recvq_DP(MPID_Request * rreq)
658 {
659     int found;
660     MPID_Request * cur_rreq;
661     MPID_Request * prev_rreq;
662     int dequeue_failed;
663     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
664 
665     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
666 
667     found = FALSE;
668     prev_rreq = NULL;
669 
670     /* MT FIXME is this right? or should the caller do this? */
671     MPIU_THREAD_CS_ENTER(MSGQUEUE,);
672     cur_rreq = recvq_posted_head;
673     while (cur_rreq != NULL) {
674 	if (cur_rreq == rreq) {
675 	    if (prev_rreq != NULL) {
676 		prev_rreq->dev.next = cur_rreq->dev.next;
677 	    }
678 	    else {
679 		recvq_posted_head = cur_rreq->dev.next;
680 	    }
681 	    if (cur_rreq->dev.next == NULL) {
682 		recvq_posted_tail = prev_rreq;
683 	    }
684             MPIR_T_DEC(posted_qlen);
685             /* Notify channel that rreq has been dequeued and check if
686                it has already matched rreq, fail if so */
687 	    dequeue_failed = MPIDI_POSTED_RECV_DEQUEUE_HOOK(rreq);
688             if (!dequeue_failed)
689                 found = TRUE;
690 	    break;
691 	}
692 
693 	prev_rreq = cur_rreq;
694 	cur_rreq = cur_rreq->dev.next;
695     }
696 
697     MPIU_THREAD_CS_EXIT(MSGQUEUE,);
698 
699     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
700     return found;
701 }
702 
703 /*
704  * MPIDI_CH3U_Recvq_FDP_or_AEU()
705  *
706  * Locate a request in the posted queue and dequeue it, or allocate a new
707  * request and enqueue it in the unexpected queue
708  *
709  * Multithread - This routine must be called from within a MSGQUEUE
710  * critical section.  If a request is allocated, it must not release
711  * the MSGQUEUE until the request is completely valid, as another thread
712  * may then find it and dequeue it.
713  *
714  * This routine is used in ch3u_eager, ch3u_eagersync, ch3u_handle_recv_pkt,
715  * ch3u_rndv, and mpidi_isend_self.  Routines within the progress engine
716  * will need to be careful to avoid nested critical sections.
717  *
718  * FIXME: Currently, the routines called from within the progress engine
719  * do not use the MSGQUEUE CS, because in the brief-global mode, that
720  * simply uses the global_mutex .
721  */
722 #undef FUNCNAME
723 #define FUNCNAME MPIDI_CH3U_Recvq_FDP_or_AEU
724 #undef FCNAME
725 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match,int * foundp)726 MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match,
727 					   int * foundp)
728 {
729     int found;
730     MPID_Request * rreq;
731     MPID_Request * prev_rreq;
732     int channel_matched;
733     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
734 
735     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
736 
737     MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
738 
739  top_loop:
740     prev_rreq = NULL;
741 
742     rreq = recvq_posted_head;
743 
744     while (rreq != NULL) {
745         MPIR_T_INC(posted_recvq_match_attempts);
746 	if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, *match, rreq->dev.mask)) {
747 	    if (prev_rreq != NULL) {
748 		prev_rreq->dev.next = rreq->dev.next;
749 	    }
750 	    else {
751 		recvq_posted_head = rreq->dev.next;
752 	    }
753 	    if (rreq->dev.next == NULL) {
754 		recvq_posted_tail = prev_rreq;
755 	    }
756             MPIR_T_DEC(posted_qlen);
757 
758             /* give channel a chance to match the request, try again if so */
759 	    channel_matched = MPIDI_POSTED_RECV_DEQUEUE_HOOK(rreq);
760             if (channel_matched)
761                 goto top_loop;
762 
763 	    found = TRUE;
764 	    goto lock_exit;
765 	}
766 	prev_rreq = rreq;
767 	rreq = rreq->dev.next;
768     }
769 
770     /* A matching request was not found in the posted queue, so we
771        need to allocate a new request and add it to the unexpected queue */
772     {
773 	int mpi_errno=0;
774 	MPIDI_Request_create_rreq( rreq, mpi_errno,
775 				   found=FALSE;goto lock_exit );
776         MPIU_Assert(mpi_errno == 0);
777         rreq->dev.recv_pending_count = 1;
778 	rreq->dev.match	= *match;
779 	rreq->dev.next	= NULL;
780 	if (recvq_unexpected_tail != NULL) {
781 	    recvq_unexpected_tail->dev.next = rreq;
782 	}
783 	else {
784 	    recvq_unexpected_head = rreq;
785 	}
786 	recvq_unexpected_tail = rreq;
787         MPIR_T_INC(unexpected_qlen);
788     }
789 
790     found = FALSE;
791 
792   lock_exit:
793 
794     *foundp = found;
795 
796     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
797     return rreq;
798 }
799 
800 /* returns TRUE iff the request was sent on the vc */
req_uses_vc(const MPID_Request * req,const MPIDI_VC_t * vc)801 static inline int req_uses_vc(const MPID_Request* req, const MPIDI_VC_t *vc)
802 {
803     MPIDI_VC_t *vc1;
804 
805     MPIDI_Comm_get_vc(req->comm, req->dev.match.parts.rank, &vc1);
806     return vc == vc1;
807 }
808 
809 #undef FUNCNAME
810 #define FUNCNAME dequeue_and_set_error
811 #undef FCNAME
812 #define FCNAME MPIU_QUOTE(FUNCNAME)
813 /* This dequeues req from the posted recv queue, set req's error code to comm_fail, and updates the req pointer.
814    Note that this creates a new error code if one hasn't already been created (i.e., if *error is MPI_SUCCESS). */
dequeue_and_set_error(MPID_Request ** req,MPID_Request * prev_req,int * error,int rank)815 static inline void dequeue_and_set_error(MPID_Request **req,  MPID_Request *prev_req, int *error, int rank)
816 {
817     MPID_Request *next = (*req)->dev.next;
818 
819     if (*error == MPI_SUCCESS) {
820         if (rank == MPI_PROC_NULL)
821             MPIU_ERR_SET(*error, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail");
822         else
823             MPIU_ERR_SET1(*error, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", rank);
824     }
825 
826     /* remove from queue */
827     if (recvq_posted_head == *req)
828         recvq_posted_head = (*req)->dev.next;
829     else
830         prev_req->dev.next = (*req)->dev.next;
831     if (recvq_posted_tail == *req)
832         recvq_posted_tail = prev_req;
833 
834     MPIR_T_DEC(posted_qlen);
835 
836     /* set error and complete */
837     (*req)->status.MPI_ERROR = *error;
838     MPIDI_CH3U_Request_complete(*req);
839     MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
840                      (MPIU_DBG_FDEST, "set error of req %p (%#08x) to %#x and completing.",
841                       *req, (*req)->handle, *error));
842     *req = next;
843 }
844 
845 #undef FUNCNAME
846 #define FUNCNAME MPIDI_CH3U_Complete_disabled_anysources
847 #undef FCNAME
848 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3U_Complete_disabled_anysources(void)849 int MPIDI_CH3U_Complete_disabled_anysources(void)
850 {
851     int mpi_errno = MPI_SUCCESS;
852     MPID_Request *req, *prev_req;
853     int error = MPI_SUCCESS;
854     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
855 
856     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
857     MPIU_THREAD_CS_ENTER(MSGQUEUE,);
858 
859     /* Check each request in the posted queue, and complete-with-error any
860        anysource requests posted on communicators that have disabled
861        anysources */
862     req = recvq_posted_head;
863     prev_req = NULL;
864     while (req) {
865         if (req->dev.match.parts.rank == MPI_ANY_SOURCE && !MPIDI_CH3I_Comm_AS_enabled(req->comm)) {
866             dequeue_and_set_error(&req, prev_req, &error, MPI_PROC_NULL); /* we don't know the rank of the failed proc */
867         } else {
868             prev_req = req;
869             req = req->dev.next;
870         }
871     }
872 
873  fn_exit:
874     MPIU_THREAD_CS_EXIT(MSGQUEUE,);
875 
876     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
877     return mpi_errno;
878  fn_fail:
879     goto fn_exit;
880 }
881 
882 
883 #undef FUNCNAME
884 #define FUNCNAME MPIDU_Complete_posted_with_error
885 #undef FCNAME
886 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t * vc)887 int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc)
888 {
889     int mpi_errno = MPI_SUCCESS;
890     MPID_Request *req, *prev_req;
891     int error = MPI_SUCCESS;
892     MPIDI_STATE_DECL(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
893 
894     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
895 
896     MPIU_THREAD_CS_ENTER(MSGQUEUE,);
897 
898     /* check each req in the posted queue and complete-with-error any requests
899        using this VC. */
900     req = recvq_posted_head;
901     prev_req = NULL;
902     while (req) {
903         if (req->dev.match.parts.rank != MPI_ANY_SOURCE && req_uses_vc(req, vc)) {
904             dequeue_and_set_error(&req, prev_req, &error, vc->pg_rank);
905         } else {
906             prev_req = req;
907             req = req->dev.next;
908         }
909     }
910 
911  fn_exit:
912     MPIU_THREAD_CS_EXIT(MSGQUEUE,);
913 
914     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
915     return mpi_errno;
916  fn_fail:
917     goto fn_exit;
918 }
919 
920 
921 /* --BEGIN ERROR HANDLING-- */
922 /* pretty prints tag, returns out for calling convenience */
tag_val_to_str(int tag,char * out,int max)923 static char *tag_val_to_str(int tag, char *out, int max)
924 {
925     if (tag == MPI_ANY_TAG) {
926         MPIU_Strncpy(out, "MPI_ANY_TAG", max);
927     }
928     else {
929         MPIU_Snprintf(out, max, "%d", tag);
930     }
931     return out;
932 }
933 
934 /* pretty prints rank, returns out for calling convenience */
rank_val_to_str(int rank,char * out,int max)935 static char *rank_val_to_str(int rank, char *out, int max)
936 {
937     if (rank == MPI_ANY_SOURCE) {
938         MPIU_Strncpy(out, "MPI_ANY_SOURCE", max);
939     }
940     else {
941         MPIU_Snprintf(out, max, "%d", rank);
942     }
943     return out;
944 }
945 
946 /* satisfy the compiler */
947 void MPIDI_CH3U_Dbg_print_recvq(FILE *stream);
948 
949 /* This function can be called by a debugger to dump the recvq state to the
950  * given stream. */
951 #undef FUNCNAME
952 #define FUNCNAME MPIDI_CH3U_Dbg_print_recvq
953 #undef FCNAME
954 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Dbg_print_recvq(FILE * stream)955 void MPIDI_CH3U_Dbg_print_recvq(FILE *stream)
956 {
957     MPID_Request * rreq;
958     int i;
959     char tag_buf[128];
960     char rank_buf[128];
961 
962     fprintf(stream, "========================================\n");
963     fprintf(stream, "MPI_COMM_WORLD  ctx=%#x rank=%d\n", MPIR_Process.comm_world->context_id, MPIR_Process.comm_world->rank);
964     fprintf(stream, "MPI_COMM_SELF   ctx=%#x\n", MPIR_Process.comm_self->context_id);
965     if (MPIR_Process.comm_parent) {
966         fprintf(stream, "MPI_COMM_PARENT ctx=%#x recvctx=%#x\n",
967                 MPIR_Process.comm_self->context_id,
968                 MPIR_Process.comm_parent->recvcontext_id);
969     }
970     else {
971         fprintf(stream, "MPI_COMM_PARENT (NULL)\n");
972     }
973 
974     fprintf(stream, "CH3 Posted RecvQ:\n");
975     rreq = recvq_posted_head;
976     i = 0;
977     while (rreq != NULL) {
978         fprintf(stream, "..[%d] rreq=%p ctx=%#x rank=%s tag=%s\n", i, rreq,
979                         rreq->dev.match.parts.context_id,
980                         rank_val_to_str(rreq->dev.match.parts.rank, rank_buf, sizeof(rank_buf)),
981                         tag_val_to_str(rreq->dev.match.parts.tag, tag_buf, sizeof(tag_buf)));
982         ++i;
983         rreq = rreq->dev.next;
984     }
985 
986     fprintf(stream, "CH3 Unexpected RecvQ:\n");
987     rreq = recvq_unexpected_head;
988     i = 0;
989     while (rreq != NULL) {
990         fprintf(stream, "..[%d] rreq=%p ctx=%#x rank=%s tag=%s\n", i, rreq,
991                         rreq->dev.match.parts.context_id,
992                         rank_val_to_str(rreq->dev.match.parts.rank, rank_buf, sizeof(rank_buf)),
993                         tag_val_to_str(rreq->dev.match.parts.tag, tag_buf, sizeof(tag_buf)));
994         fprintf(stream, "..    status.src=%s status.tag=%s\n",
995                         rank_val_to_str(rreq->status.MPI_SOURCE, rank_buf, sizeof(rank_buf)),
996                         tag_val_to_str(rreq->status.MPI_TAG, tag_buf, sizeof(tag_buf)));
997         ++i;
998         rreq = rreq->dev.next;
999     }
1000     fprintf(stream, "========================================\n");
1001 }
1002 /* --END ERROR HANDLING-- */
1003 
1004 /* returns the number of elements in the unexpected queue */
1005 #undef FUNCNAME
1006 #define FUNCNAME MPIDI_CH3U_Recvq_count_unexp
1007 #undef FCNAME
1008 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_count_unexp(void)1009 int MPIDI_CH3U_Recvq_count_unexp(void)
1010 {
1011     int count = 0;
1012     MPID_Request *req = recvq_unexpected_head;
1013 
1014     MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
1015 
1016     while (req)
1017     {
1018         ++count;
1019         req = req->dev.next;
1020     }
1021 
1022     return count;
1023 }
1024