1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * (C) 2001 by Argonne National Laboratory.
4 * See COPYRIGHT in top-level directory.
5 */
6
7 #include "mpidimpl.h"
8
9 /* MPIDI_POSTED_RECV_ENQUEUE_HOOK(req): Notifies channel that req has
10 been enqueued on the posted recv queue. Returns void. */
11 #ifndef MPIDI_POSTED_RECV_ENQUEUE_HOOK
12 #define MPIDI_POSTED_RECV_ENQUEUE_HOOK(req) do{}while(0)
13 #endif
14 /* MPIDI_POSTED_RECV_DEQUEUE_HOOK(req): Notifies channel that req has
15 been dequeued from the posted recv queue. Returns non-zero if the
16 channel has already matched the request; 0 otherwise. This happens
17 when the channel supports shared-memory and network communication
18 with a network capable of matching, and the same request is matched
19 by the network and, e.g., shared-memory. When that happens the
20 dequeue functions below should, either search for the next matching
21 request, or report that no request was found. */
22 #ifndef MPIDI_POSTED_RECV_DEQUEUE_HOOK
23 #define MPIDI_POSTED_RECV_DEQUEUE_HOOK(req) 0
24 #endif
25
26 /* FIXME:
27 * Recvq_lock/unlock removed because it is not needed for the SINGLE_CS
28 * approach and we might want a different, non-lock-based approach in
29 * a finer-grained thread-sync version. For example,
30 * some routines can be implemented in a lock-free
31 * fashion (because the user is required to guarantee non-conflicting
32 * accesses, such as doing a probe and a receive that matches in different
33 * threads).
34 *
35 * There are a lot of routines here. Do we really need them all?
36 *
37 * The search criteria can be implemented in a single 64 bit compare on
38 * systems with efficient 64-bit operations. The rank and contextid can also
39 * in many cases be combined into a single 32-bit word for the comparison
40 * (in which case the message info should be stored in the queue in a
41 * naturally aligned, 64-bit word.
42 *
43 */
44
45 static MPID_Request * recvq_posted_head = 0;
46 static MPID_Request * recvq_posted_tail = 0;
47 static MPID_Request * recvq_unexpected_head = 0;
48 static MPID_Request * recvq_unexpected_tail = 0;
49
50 /* Export the location of the queue heads if debugger support is enabled.
51 * This allows the queue code to rely on the local variables for the
52 * queue heads while also exporting those variables to the debugger.
53 * See src/mpi/debugger/dll_mpich2.c for how this is used to
54 * access the message queues.
55 */
56 #ifdef HAVE_DEBUGGER_SUPPORT
57 MPID_Request ** const MPID_Recvq_posted_head_ptr = &recvq_posted_head;
58 MPID_Request ** const MPID_Recvq_unexpected_head_ptr = &recvq_unexpected_head;
59 #endif
60
61 /* TODO decide control this independently via configure or with the existing
62 * --enable-timing option (#ifdef COLLECT_STATS) */
63 #define ENABLE_RECVQ_STATISTICS 1
64 #ifdef ENABLE_RECVQ_STATISTICS
65 static unsigned int posted_qlen = 0;
66 static unsigned int unexpected_qlen = 0;
67 static MPI_Aint posted_recvq_match_attempts = 0;
68 static MPI_Aint unexpected_recvq_match_attempts = 0;
69 /* TODO add some code here and probably elsewhere to make these show up in the
70 * MPIX_T_pvar_ interface */
71 #define MPIR_T_INC(x) (++(x))
72 #define MPIR_T_DEC(x) (--(x))
73
74 #else
75
76 #define MPIR_T_INC(x)
77 #define MPIR_T_DEC(x)
78
79 #endif /* defined(ENABLE_RECVQ_STATISTICS) */
80
81 /* If the MPIDI_Message_match structure fits into a pointer size, we
82 * can directly work on it */
83 /* MATCH_WITH_NO_MASK compares the match values without masking
84 * them. This is useful for the case where there are no ANY_TAG or
85 * ANY_SOURCE wild cards. */
86 #define MATCH_WITH_NO_MASK(match1, match2) \
87 ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ? ((match1).whole == (match2).whole) : \
88 (((match1).parts.rank == (match2).parts.rank) && \
89 ((match1).parts.tag == (match2).parts.tag) && \
90 ((match1).parts.context_id == (match2).parts.context_id)))
91
92 /* MATCH_WITH_LEFT_MASK compares the match values after masking only
93 * the left field. This is useful for the case where the right match
94 * is a part of the unexpected queue and has no ANY_TAG or ANY_SOURCE
95 * wild cards, but the left match might have them. */
96 #define MATCH_WITH_LEFT_MASK(match1, match2, mask) \
97 ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ? \
98 (((match1).whole & (mask).whole) == (match2).whole) : \
99 ((((match1).parts.rank & (mask).parts.rank) == (match2).parts.rank) && \
100 (((match1).parts.tag & (mask).parts.tag) == (match2).parts.tag) && \
101 ((match1).parts.context_id == (match2).parts.context_id)))
102
103 /* This is the most general case where both matches have to be
104 * masked. Both matches are masked with the same value. There doesn't
105 * seem to be a need for two different masks at this time. */
106 #define MATCH_WITH_LEFT_RIGHT_MASK(match1, match2, mask) \
107 ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ? \
108 (((match1).whole & (mask).whole) == ((match2).whole & (mask).whole)) : \
109 ((((match1).parts.rank & (mask).parts.rank) == ((match2).parts.rank & (mask).parts.rank)) && \
110 (((match1).parts.tag & (mask).parts.tag) == ((match2).parts.tag & (mask).parts.tag)) && \
111 ((match1).parts.context_id == (match2).parts.context_id)))
112
113 /* will be invoked to populate the custom parts of pvar_handle objects */
simple_uint_creator(void * obj_handle,struct MPIR_T_pvar_handle * handle,int * countp)114 static int simple_uint_creator(void *obj_handle,
115 struct MPIR_T_pvar_handle *handle,
116 int *countp)
117 {
118 /* the IMPL_SIMPLE code reads/writes "bytes" bytes from the location given
119 * by the "handle_state" pointer */
120 handle->handle_state = handle->info->var_state;
121 handle->bytes = sizeof(unsigned int);
122
123 /* a single unsigned int should be read/written */
124 *countp = 1;
125 return MPI_SUCCESS;
126 }
127
simple_aint_creator(void * obj_handle,struct MPIR_T_pvar_handle * handle,int * countp)128 static int simple_aint_creator(void *obj_handle,
129 struct MPIR_T_pvar_handle *handle,
130 int *countp)
131 {
132 /* the IMPL_SIMPLE code reads/writes "bytes" bytes from the location given
133 * by the "handle_state" pointer */
134 handle->handle_state = handle->info->var_state;
135 handle->bytes = sizeof(MPI_Aint);
136
137 /* a single Aint should be read/written */
138 *countp = 1;
139 return MPI_SUCCESS;
140 }
141
142 #undef FUNCNAME
143 #define FUNCNAME MPIDI_CH3U_Recvq_FU
144 #undef FCNAME
145 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_init(void)146 int MPIDI_CH3U_Recvq_init(void)
147 {
148 int mpi_errno = MPI_SUCCESS;
149 #ifdef ENABLE_RECVQ_STATISTICS
150 int idx = -1;
151 mpi_errno = MPIR_T_pvar_add("posted_recvq_length",
152 MPIX_T_VERBOSITY_USER_DETAIL,
153 MPIX_T_PVAR_CLASS_LEVEL,
154 MPI_UNSIGNED,
155 MPIX_T_ENUM_NULL,
156 "length of the posted message receive queue",
157 MPIX_T_BIND_NO_OBJECT,
158 /*readonly=*/TRUE,
159 /*continuous=*/TRUE,
160 /*atomic=*/FALSE,
161 MPIR_T_PVAR_IMPL_SIMPLE,
162 /*var_state=*/&posted_qlen,
163 &simple_uint_creator,
164 &idx);
165 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
166
167 mpi_errno = MPIR_T_pvar_add("unexpected_recvq_length",
168 MPIX_T_VERBOSITY_USER_DETAIL,
169 MPIX_T_PVAR_CLASS_LEVEL,
170 MPI_UNSIGNED,
171 MPIX_T_ENUM_NULL,
172 "length of the unexpected messsage receive queue",
173 MPIX_T_BIND_NO_OBJECT,
174 /*readonly=*/TRUE,
175 /*continuous=*/TRUE,
176 /*atomic=*/FALSE,
177 MPIR_T_PVAR_IMPL_SIMPLE,
178 /*var_state=*/&unexpected_qlen,
179 &simple_uint_creator,
180 &idx);
181 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
182
183 /* posted receive queue failed matches */
184 mpi_errno = MPIR_T_pvar_add("posted_recvq_match_attempts",
185 MPIX_T_VERBOSITY_USER_DETAIL,
186 MPIX_T_PVAR_CLASS_COUNTER,
187 MPI_AINT,
188 MPIX_T_ENUM_NULL,
189 "number of search passes on the messsage receive queue",
190 MPIX_T_BIND_NO_OBJECT,
191 /*readonly=*/FALSE,
192 /*continuous=*/TRUE,
193 /*atomic=*/FALSE,
194 MPIR_T_PVAR_IMPL_SIMPLE,
195 /*var_state=*/&posted_recvq_match_attempts,
196 &simple_aint_creator,
197 &idx);
198 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
199
200 /* unexpected receive queue failed matches */
201 mpi_errno = MPIR_T_pvar_add("unexpected_recvq_match_attempts",
202 MPIX_T_VERBOSITY_USER_DETAIL,
203 MPIX_T_PVAR_CLASS_COUNTER,
204 MPI_AINT,
205 MPIX_T_ENUM_NULL,
206 "number of search passes on the messsage receive queue",
207 MPIX_T_BIND_NO_OBJECT,
208 /*readonly=*/FALSE,
209 /*continuous=*/TRUE,
210 /*atomic=*/FALSE,
211 MPIR_T_PVAR_IMPL_SIMPLE,
212 /*var_state=*/&unexpected_recvq_match_attempts,
213 &simple_aint_creator,
214 &idx);
215 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
216 #endif
217 fn_fail:
218 return mpi_errno;
219 }
220
221 /* FIXME: If this routine is only used by probe/iprobe, then we don't need
222 to set the cancelled field in status (only set for nonblocking requests) */
223 /*
224 * MPIDI_CH3U_Recvq_FU()
225 *
226 * Search for a matching request in the unexpected receive queue. Return
227 * true if one is found, false otherwise. If the status arguement is
228 * not MPI_STATUS_IGNORE, return information about the request in that
229 * parameter. This routine is used by mpid_probe and mpid_iprobe.
230 *
231 * Multithread - As this is a read-only routine, it need not
232 * require an external critical section (careful organization of the
233 * queue updates would not even require a critical section within this
234 * routine). However, this routine is used both from within the progress
235 * engine and from without it. To make that work with the current
236 * design for MSGQUEUE and the brief-global mode, the critical section
237 * is *outside* of this routine.
238 *
239 * This routine is used only in mpid_iprobe and mpid_probe
240 *
241 */
242 #undef FUNCNAME
243 #define FUNCNAME MPIDI_CH3U_Recvq_FU
244 #undef FCNAME
245 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FU(int source,int tag,int context_id,MPI_Status * s)246 int MPIDI_CH3U_Recvq_FU(int source, int tag, int context_id, MPI_Status *s)
247 {
248 MPID_Request * rreq;
249 int found = 0;
250 MPIDI_Message_match match, mask;
251 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
252
253 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
254
255 MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
256
257 rreq = recvq_unexpected_head;
258
259 match.parts.context_id = context_id;
260 match.parts.tag = tag;
261 match.parts.rank = source;
262
263 if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
264 while (rreq != NULL) {
265 MPIR_T_INC(unexpected_recvq_match_attempts);
266 if (MATCH_WITH_NO_MASK(rreq->dev.match, match))
267 break;
268 rreq = rreq->dev.next;
269 }
270 }
271 else {
272 mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
273 if (tag == MPI_ANY_TAG)
274 match.parts.tag = mask.parts.tag = 0;
275 if (source == MPI_ANY_SOURCE)
276 match.parts.rank = mask.parts.rank = 0;
277
278 while (rreq != NULL) {
279 MPIR_T_INC(unexpected_recvq_match_attempts);
280 if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask))
281 break;
282 rreq = rreq->dev.next;
283 }
284 }
285
286 /* Save the information about the request before releasing the
287 queue */
288 if (rreq) {
289 if (s != MPI_STATUS_IGNORE) {
290 /* Avoid setting "extra" fields like MPI_ERROR */
291 s->MPI_SOURCE = rreq->status.MPI_SOURCE;
292 s->MPI_TAG = rreq->status.MPI_TAG;
293 s->count = rreq->status.count;
294 s->cancelled = rreq->status.cancelled;
295 }
296 found = 1;
297 }
298
299 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
300 return found;
301 }
302
303 /*
304 * MPIDI_CH3U_Recvq_FDU()
305 *
306 * Find a request in the unexpected queue and dequeue it; otherwise return NULL.
307 *
308 * Multithread - This routine must be atomic (since it dequeues a
309 * request). However, once the request is dequeued, no other thread can
310 * see it, so this routine provides its own atomicity.
311 *
312 * This routine is used only in the case of send_cancel. However, it is used both
313 * within mpid_send_cancel and within a packet handler.
314 */
315 #undef FUNCNAME
316 #define FUNCNAME MPIDI_CH3U_Recvq_FDU
317 #undef FCNAME
318 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id,MPIDI_Message_match * match)319 MPID_Request * MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id,
320 MPIDI_Message_match * match)
321 {
322 MPID_Request * rreq;
323 MPID_Request * prev_rreq;
324 MPID_Request * cur_rreq;
325 MPID_Request * matching_prev_rreq;
326 MPID_Request * matching_cur_rreq;
327 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
328
329 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
330
331 MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
332
333 matching_prev_rreq = NULL;
334 matching_cur_rreq = NULL;
335 prev_rreq = NULL;
336
337 /* Note that since this routine is used only in the case of send_cancel,
338 there can be only one match if at all. */
339 /* FIXME: Why doesn't this exit after it finds the first match? */
340 cur_rreq = recvq_unexpected_head;
341 while (cur_rreq != NULL) {
342 if (cur_rreq->dev.sender_req_id == sreq_id &&
343 MPIR_T_INC(unexpected_recvq_match_attempts) &&
344 (MATCH_WITH_NO_MASK(cur_rreq->dev.match, *match))) {
345 matching_prev_rreq = prev_rreq;
346 matching_cur_rreq = cur_rreq;
347 }
348 prev_rreq = cur_rreq;
349 cur_rreq = cur_rreq->dev.next;
350 }
351
352 if (matching_cur_rreq != NULL) {
353 if (matching_prev_rreq != NULL) {
354 matching_prev_rreq->dev.next = matching_cur_rreq->dev.next;
355 }
356 else {
357 recvq_unexpected_head = matching_cur_rreq->dev.next;
358 }
359
360 if (matching_cur_rreq->dev.next == NULL) {
361 recvq_unexpected_tail = matching_prev_rreq;
362 }
363
364 MPIR_T_DEC(unexpected_qlen);
365 rreq = matching_cur_rreq;
366 }
367 else {
368 rreq = NULL;
369 }
370
371 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
372 return rreq;
373 }
374
375 /* TODO rename the old FDU and use that name for this one */
376 /* This is the routine that you expect to be named "_FDU". It implements the
377 * behavior needed for improbe; specifically, searching the receive queue for
378 * the first matching request and dequeueing it. */
379 #undef FUNCNAME
380 #define FUNCNAME MPIDI_CH3U_Recvq_FDU_matchonly
381 #undef FCNAME
382 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDU_matchonly(int source,int tag,int context_id,MPID_Comm * comm,int * foundp)383 MPID_Request * MPIDI_CH3U_Recvq_FDU_matchonly(int source, int tag, int context_id, MPID_Comm *comm, int *foundp)
384 {
385 int found = FALSE;
386 MPID_Request *rreq, *prev_rreq;
387 MPIDI_Message_match match;
388 MPIDI_Message_match mask;
389 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
390
391 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
392
393 MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
394
395 /* Optimize this loop for an empty unexpected receive queue */
396 rreq = recvq_unexpected_head;
397 if (rreq) {
398 prev_rreq = NULL;
399
400 match.parts.context_id = context_id;
401 match.parts.tag = tag;
402 match.parts.rank = source;
403
404 if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
405 do {
406 MPIR_T_INC(unexpected_recvq_match_attempts);
407 if (MATCH_WITH_NO_MASK(rreq->dev.match, match)) {
408 if (prev_rreq != NULL) {
409 prev_rreq->dev.next = rreq->dev.next;
410 }
411 else {
412 recvq_unexpected_head = rreq->dev.next;
413 }
414
415 if (rreq->dev.next == NULL) {
416 recvq_unexpected_tail = prev_rreq;
417 }
418 MPIR_T_DEC(unexpected_qlen);
419
420 rreq->comm = comm;
421 MPIR_Comm_add_ref(comm);
422 /* don't have the (buf,count,type) info right now, can't add
423 * it to the request */
424 found = TRUE;
425 goto lock_exit;
426 }
427 prev_rreq = rreq;
428 rreq = rreq->dev.next;
429 } while (rreq);
430 }
431 else {
432 mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
433 if (tag == MPI_ANY_TAG)
434 match.parts.tag = mask.parts.tag = 0;
435 if (source == MPI_ANY_SOURCE)
436 match.parts.rank = mask.parts.rank = 0;
437
438 do {
439 MPIR_T_INC(unexpected_recvq_match_attempts);
440 if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
441 if (prev_rreq != NULL) {
442 prev_rreq->dev.next = rreq->dev.next;
443 }
444 else {
445 recvq_unexpected_head = rreq->dev.next;
446 }
447 if (rreq->dev.next == NULL) {
448 recvq_unexpected_tail = prev_rreq;
449 }
450 MPIR_T_DEC(unexpected_qlen);
451
452 rreq->comm = comm;
453 MPIR_Comm_add_ref(comm);
454 /* don't have the (buf,count,type) info right now, can't add
455 * it to the request */
456 found = TRUE;
457 goto lock_exit;
458 }
459 prev_rreq = rreq;
460 rreq = rreq->dev.next;
461 } while (rreq);
462 }
463 }
464
465 lock_exit:
466 *foundp = found;
467
468 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
469 return rreq;
470 }
471
472 /*
473 * MPIDI_CH3U_Recvq_FDU_or_AEP()
474 *
475 * Atomically find a request in the unexpected queue and dequeue it, or
476 * allocate a new request and enqueue it in the posted queue
477 *
478 * Multithread - This routine must be called from within a MSGQUEUE
479 * critical section. If a request is allocated, it must not release
480 * the MSGQUEUE until the request is completely valid, as another thread
481 * may then find it and dequeue it.
482 *
483 * This routine is used in mpid_irecv and mpid_recv.
484 *
485 */
486 #undef FUNCNAME
487 #define FUNCNAME MPIDI_CH3U_Recvq_FDU_or_AEP
488 #undef FCNAME
489 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDU_or_AEP(int source,int tag,int context_id,MPID_Comm * comm,void * user_buf,int user_count,MPI_Datatype datatype,int * foundp)490 MPID_Request * MPIDI_CH3U_Recvq_FDU_or_AEP(int source, int tag,
491 int context_id, MPID_Comm *comm, void *user_buf,
492 int user_count, MPI_Datatype datatype, int * foundp)
493 {
494 int found;
495 MPID_Request *rreq, *prev_rreq;
496 MPIDI_Message_match match;
497 MPIDI_Message_match mask;
498 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
499
500 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
501
502 MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
503
504 /* Optimize this loop for an empty unexpected receive queue */
505 rreq = recvq_unexpected_head;
506 if (rreq) {
507 prev_rreq = NULL;
508
509 match.parts.context_id = context_id;
510 match.parts.tag = tag;
511 match.parts.rank = source;
512
513 if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
514 do {
515 MPIR_T_INC(unexpected_recvq_match_attempts);
516 if (MATCH_WITH_NO_MASK(rreq->dev.match, match)) {
517 if (prev_rreq != NULL) {
518 prev_rreq->dev.next = rreq->dev.next;
519 }
520 else {
521 recvq_unexpected_head = rreq->dev.next;
522 }
523
524 if (rreq->dev.next == NULL) {
525 recvq_unexpected_tail = prev_rreq;
526 }
527 MPIR_T_DEC(unexpected_qlen);
528
529 rreq->comm = comm;
530 MPIR_Comm_add_ref(comm);
531 rreq->dev.user_buf = user_buf;
532 rreq->dev.user_count = user_count;
533 rreq->dev.datatype = datatype;
534 found = TRUE;
535 goto lock_exit;
536 }
537 prev_rreq = rreq;
538 rreq = rreq->dev.next;
539 } while (rreq);
540 }
541 else {
542 mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
543 if (tag == MPI_ANY_TAG)
544 match.parts.tag = mask.parts.tag = 0;
545 if (source == MPI_ANY_SOURCE)
546 match.parts.rank = mask.parts.rank = 0;
547
548 do {
549 MPIR_T_INC(unexpected_recvq_match_attempts);
550 if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
551 if (prev_rreq != NULL) {
552 prev_rreq->dev.next = rreq->dev.next;
553 }
554 else {
555 recvq_unexpected_head = rreq->dev.next;
556 }
557 if (rreq->dev.next == NULL) {
558 recvq_unexpected_tail = prev_rreq;
559 }
560 MPIR_T_DEC(unexpected_qlen);
561
562 rreq->comm = comm;
563 MPIR_Comm_add_ref(comm);
564 rreq->dev.user_buf = user_buf;
565 rreq->dev.user_count = user_count;
566 rreq->dev.datatype = datatype;
567 found = TRUE;
568 goto lock_exit;
569 }
570 prev_rreq = rreq;
571 rreq = rreq->dev.next;
572 } while (rreq);
573 }
574 }
575
576 /* A matching request was not found in the unexpected queue, so we
577 need to allocate a new request and add it to the posted queue */
578 {
579 int mpi_errno = MPI_SUCCESS;
580
581 found = FALSE;
582
583 MPIDI_Request_create_rreq( rreq, mpi_errno, goto lock_exit );
584 rreq->dev.match.parts.tag = tag;
585 rreq->dev.match.parts.rank = source;
586 rreq->dev.match.parts.context_id = context_id;
587
588 /* Added a mask for faster search on 64-bit capable
589 * platforms */
590 rreq->dev.mask.parts.context_id = ~0;
591 if (rreq->dev.match.parts.rank == MPI_ANY_SOURCE)
592 rreq->dev.mask.parts.rank = 0;
593 else
594 rreq->dev.mask.parts.rank = ~0;
595 if (rreq->dev.match.parts.tag == MPI_ANY_TAG)
596 rreq->dev.mask.parts.tag = 0;
597 else
598 rreq->dev.mask.parts.tag = ~0;
599
600 rreq->comm = comm;
601 MPIR_Comm_add_ref(comm);
602 rreq->dev.user_buf = user_buf;
603 rreq->dev.user_count = user_count;
604 rreq->dev.datatype = datatype;
605
606 /* check whether VC has failed, or this is an ANY_SOURCE in a
607 failed communicator */
608 if (source != MPI_ANY_SOURCE) {
609 MPIDI_VC_t *vc;
610 MPIDI_Comm_get_vc(comm, source, &vc);
611 if (vc->state == MPIDI_VC_STATE_MORIBUND) {
612 MPIU_ERR_SET1(mpi_errno, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", vc->pg_rank);
613 rreq->status.MPI_ERROR = mpi_errno;
614 MPIDI_CH3U_Request_complete(rreq);
615 goto lock_exit;
616 }
617 } else if (!MPIDI_CH3I_Comm_AS_enabled(comm)) {
618 MPIU_ERR_SET(mpi_errno, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail");
619 rreq->status.MPI_ERROR = mpi_errno;
620 MPIDI_CH3U_Request_complete(rreq);
621 goto lock_exit;
622 }
623
624 rreq->dev.next = NULL;
625 if (recvq_posted_tail != NULL) {
626 recvq_posted_tail->dev.next = rreq;
627 }
628 else {
629 recvq_posted_head = rreq;
630 }
631 recvq_posted_tail = rreq;
632 MPIR_T_INC(posted_qlen);
633 MPIDI_POSTED_RECV_ENQUEUE_HOOK(rreq);
634 }
635
636 lock_exit:
637
638 *foundp = found;
639
640 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
641 return rreq;
642 }
643
644
645 /*
646 * MPIDI_CH3U_Recvq_DP()
647 *
648 * Given an existing request, dequeue that request from the posted queue, or
649 * return NULL if the request was not in the posted queued
650 *
651 * Multithread - This routine is atomic
652 */
653 #undef FUNCNAME
654 #define FUNCNAME MPIDI_CH3U_Recvq_DP
655 #undef FCNAME
656 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_DP(MPID_Request * rreq)657 int MPIDI_CH3U_Recvq_DP(MPID_Request * rreq)
658 {
659 int found;
660 MPID_Request * cur_rreq;
661 MPID_Request * prev_rreq;
662 int dequeue_failed;
663 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
664
665 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
666
667 found = FALSE;
668 prev_rreq = NULL;
669
670 /* MT FIXME is this right? or should the caller do this? */
671 MPIU_THREAD_CS_ENTER(MSGQUEUE,);
672 cur_rreq = recvq_posted_head;
673 while (cur_rreq != NULL) {
674 if (cur_rreq == rreq) {
675 if (prev_rreq != NULL) {
676 prev_rreq->dev.next = cur_rreq->dev.next;
677 }
678 else {
679 recvq_posted_head = cur_rreq->dev.next;
680 }
681 if (cur_rreq->dev.next == NULL) {
682 recvq_posted_tail = prev_rreq;
683 }
684 MPIR_T_DEC(posted_qlen);
685 /* Notify channel that rreq has been dequeued and check if
686 it has already matched rreq, fail if so */
687 dequeue_failed = MPIDI_POSTED_RECV_DEQUEUE_HOOK(rreq);
688 if (!dequeue_failed)
689 found = TRUE;
690 break;
691 }
692
693 prev_rreq = cur_rreq;
694 cur_rreq = cur_rreq->dev.next;
695 }
696
697 MPIU_THREAD_CS_EXIT(MSGQUEUE,);
698
699 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
700 return found;
701 }
702
703 /*
704 * MPIDI_CH3U_Recvq_FDP_or_AEU()
705 *
706 * Locate a request in the posted queue and dequeue it, or allocate a new
707 * request and enqueue it in the unexpected queue
708 *
709 * Multithread - This routine must be called from within a MSGQUEUE
710 * critical section. If a request is allocated, it must not release
711 * the MSGQUEUE until the request is completely valid, as another thread
712 * may then find it and dequeue it.
713 *
714 * This routine is used in ch3u_eager, ch3u_eagersync, ch3u_handle_recv_pkt,
715 * ch3u_rndv, and mpidi_isend_self. Routines within the progress engine
716 * will need to be careful to avoid nested critical sections.
717 *
718 * FIXME: Currently, the routines called from within the progress engine
719 * do not use the MSGQUEUE CS, because in the brief-global mode, that
720 * simply uses the global_mutex .
721 */
722 #undef FUNCNAME
723 #define FUNCNAME MPIDI_CH3U_Recvq_FDP_or_AEU
724 #undef FCNAME
725 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match,int * foundp)726 MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match,
727 int * foundp)
728 {
729 int found;
730 MPID_Request * rreq;
731 MPID_Request * prev_rreq;
732 int channel_matched;
733 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
734
735 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
736
737 MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
738
739 top_loop:
740 prev_rreq = NULL;
741
742 rreq = recvq_posted_head;
743
744 while (rreq != NULL) {
745 MPIR_T_INC(posted_recvq_match_attempts);
746 if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, *match, rreq->dev.mask)) {
747 if (prev_rreq != NULL) {
748 prev_rreq->dev.next = rreq->dev.next;
749 }
750 else {
751 recvq_posted_head = rreq->dev.next;
752 }
753 if (rreq->dev.next == NULL) {
754 recvq_posted_tail = prev_rreq;
755 }
756 MPIR_T_DEC(posted_qlen);
757
758 /* give channel a chance to match the request, try again if so */
759 channel_matched = MPIDI_POSTED_RECV_DEQUEUE_HOOK(rreq);
760 if (channel_matched)
761 goto top_loop;
762
763 found = TRUE;
764 goto lock_exit;
765 }
766 prev_rreq = rreq;
767 rreq = rreq->dev.next;
768 }
769
770 /* A matching request was not found in the posted queue, so we
771 need to allocate a new request and add it to the unexpected queue */
772 {
773 int mpi_errno=0;
774 MPIDI_Request_create_rreq( rreq, mpi_errno,
775 found=FALSE;goto lock_exit );
776 MPIU_Assert(mpi_errno == 0);
777 rreq->dev.recv_pending_count = 1;
778 rreq->dev.match = *match;
779 rreq->dev.next = NULL;
780 if (recvq_unexpected_tail != NULL) {
781 recvq_unexpected_tail->dev.next = rreq;
782 }
783 else {
784 recvq_unexpected_head = rreq;
785 }
786 recvq_unexpected_tail = rreq;
787 MPIR_T_INC(unexpected_qlen);
788 }
789
790 found = FALSE;
791
792 lock_exit:
793
794 *foundp = found;
795
796 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
797 return rreq;
798 }
799
800 /* returns TRUE iff the request was sent on the vc */
req_uses_vc(const MPID_Request * req,const MPIDI_VC_t * vc)801 static inline int req_uses_vc(const MPID_Request* req, const MPIDI_VC_t *vc)
802 {
803 MPIDI_VC_t *vc1;
804
805 MPIDI_Comm_get_vc(req->comm, req->dev.match.parts.rank, &vc1);
806 return vc == vc1;
807 }
808
809 #undef FUNCNAME
810 #define FUNCNAME dequeue_and_set_error
811 #undef FCNAME
812 #define FCNAME MPIU_QUOTE(FUNCNAME)
813 /* This dequeues req from the posted recv queue, set req's error code to comm_fail, and updates the req pointer.
814 Note that this creates a new error code if one hasn't already been created (i.e., if *error is MPI_SUCCESS). */
dequeue_and_set_error(MPID_Request ** req,MPID_Request * prev_req,int * error,int rank)815 static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev_req, int *error, int rank)
816 {
817 MPID_Request *next = (*req)->dev.next;
818
819 if (*error == MPI_SUCCESS) {
820 if (rank == MPI_PROC_NULL)
821 MPIU_ERR_SET(*error, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail");
822 else
823 MPIU_ERR_SET1(*error, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", rank);
824 }
825
826 /* remove from queue */
827 if (recvq_posted_head == *req)
828 recvq_posted_head = (*req)->dev.next;
829 else
830 prev_req->dev.next = (*req)->dev.next;
831 if (recvq_posted_tail == *req)
832 recvq_posted_tail = prev_req;
833
834 MPIR_T_DEC(posted_qlen);
835
836 /* set error and complete */
837 (*req)->status.MPI_ERROR = *error;
838 MPIDI_CH3U_Request_complete(*req);
839 MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
840 (MPIU_DBG_FDEST, "set error of req %p (%#08x) to %#x and completing.",
841 *req, (*req)->handle, *error));
842 *req = next;
843 }
844
845 #undef FUNCNAME
846 #define FUNCNAME MPIDI_CH3U_Complete_disabled_anysources
847 #undef FCNAME
848 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3U_Complete_disabled_anysources(void)849 int MPIDI_CH3U_Complete_disabled_anysources(void)
850 {
851 int mpi_errno = MPI_SUCCESS;
852 MPID_Request *req, *prev_req;
853 int error = MPI_SUCCESS;
854 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
855
856 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
857 MPIU_THREAD_CS_ENTER(MSGQUEUE,);
858
859 /* Check each request in the posted queue, and complete-with-error any
860 anysource requests posted on communicators that have disabled
861 anysources */
862 req = recvq_posted_head;
863 prev_req = NULL;
864 while (req) {
865 if (req->dev.match.parts.rank == MPI_ANY_SOURCE && !MPIDI_CH3I_Comm_AS_enabled(req->comm)) {
866 dequeue_and_set_error(&req, prev_req, &error, MPI_PROC_NULL); /* we don't know the rank of the failed proc */
867 } else {
868 prev_req = req;
869 req = req->dev.next;
870 }
871 }
872
873 fn_exit:
874 MPIU_THREAD_CS_EXIT(MSGQUEUE,);
875
876 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
877 return mpi_errno;
878 fn_fail:
879 goto fn_exit;
880 }
881
882
883 #undef FUNCNAME
884 #define FUNCNAME MPIDU_Complete_posted_with_error
885 #undef FCNAME
886 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t * vc)887 int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc)
888 {
889 int mpi_errno = MPI_SUCCESS;
890 MPID_Request *req, *prev_req;
891 int error = MPI_SUCCESS;
892 MPIDI_STATE_DECL(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
893
894 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
895
896 MPIU_THREAD_CS_ENTER(MSGQUEUE,);
897
898 /* check each req in the posted queue and complete-with-error any requests
899 using this VC. */
900 req = recvq_posted_head;
901 prev_req = NULL;
902 while (req) {
903 if (req->dev.match.parts.rank != MPI_ANY_SOURCE && req_uses_vc(req, vc)) {
904 dequeue_and_set_error(&req, prev_req, &error, vc->pg_rank);
905 } else {
906 prev_req = req;
907 req = req->dev.next;
908 }
909 }
910
911 fn_exit:
912 MPIU_THREAD_CS_EXIT(MSGQUEUE,);
913
914 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
915 return mpi_errno;
916 fn_fail:
917 goto fn_exit;
918 }
919
920
921 /* --BEGIN ERROR HANDLING-- */
922 /* pretty prints tag, returns out for calling convenience */
tag_val_to_str(int tag,char * out,int max)923 static char *tag_val_to_str(int tag, char *out, int max)
924 {
925 if (tag == MPI_ANY_TAG) {
926 MPIU_Strncpy(out, "MPI_ANY_TAG", max);
927 }
928 else {
929 MPIU_Snprintf(out, max, "%d", tag);
930 }
931 return out;
932 }
933
934 /* pretty prints rank, returns out for calling convenience */
rank_val_to_str(int rank,char * out,int max)935 static char *rank_val_to_str(int rank, char *out, int max)
936 {
937 if (rank == MPI_ANY_SOURCE) {
938 MPIU_Strncpy(out, "MPI_ANY_SOURCE", max);
939 }
940 else {
941 MPIU_Snprintf(out, max, "%d", rank);
942 }
943 return out;
944 }
945
946 /* satisfy the compiler */
947 void MPIDI_CH3U_Dbg_print_recvq(FILE *stream);
948
949 /* This function can be called by a debugger to dump the recvq state to the
950 * given stream. */
951 #undef FUNCNAME
952 #define FUNCNAME MPIDI_CH3U_Dbg_print_recvq
953 #undef FCNAME
954 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Dbg_print_recvq(FILE * stream)955 void MPIDI_CH3U_Dbg_print_recvq(FILE *stream)
956 {
957 MPID_Request * rreq;
958 int i;
959 char tag_buf[128];
960 char rank_buf[128];
961
962 fprintf(stream, "========================================\n");
963 fprintf(stream, "MPI_COMM_WORLD ctx=%#x rank=%d\n", MPIR_Process.comm_world->context_id, MPIR_Process.comm_world->rank);
964 fprintf(stream, "MPI_COMM_SELF ctx=%#x\n", MPIR_Process.comm_self->context_id);
965 if (MPIR_Process.comm_parent) {
966 fprintf(stream, "MPI_COMM_PARENT ctx=%#x recvctx=%#x\n",
967 MPIR_Process.comm_self->context_id,
968 MPIR_Process.comm_parent->recvcontext_id);
969 }
970 else {
971 fprintf(stream, "MPI_COMM_PARENT (NULL)\n");
972 }
973
974 fprintf(stream, "CH3 Posted RecvQ:\n");
975 rreq = recvq_posted_head;
976 i = 0;
977 while (rreq != NULL) {
978 fprintf(stream, "..[%d] rreq=%p ctx=%#x rank=%s tag=%s\n", i, rreq,
979 rreq->dev.match.parts.context_id,
980 rank_val_to_str(rreq->dev.match.parts.rank, rank_buf, sizeof(rank_buf)),
981 tag_val_to_str(rreq->dev.match.parts.tag, tag_buf, sizeof(tag_buf)));
982 ++i;
983 rreq = rreq->dev.next;
984 }
985
986 fprintf(stream, "CH3 Unexpected RecvQ:\n");
987 rreq = recvq_unexpected_head;
988 i = 0;
989 while (rreq != NULL) {
990 fprintf(stream, "..[%d] rreq=%p ctx=%#x rank=%s tag=%s\n", i, rreq,
991 rreq->dev.match.parts.context_id,
992 rank_val_to_str(rreq->dev.match.parts.rank, rank_buf, sizeof(rank_buf)),
993 tag_val_to_str(rreq->dev.match.parts.tag, tag_buf, sizeof(tag_buf)));
994 fprintf(stream, ".. status.src=%s status.tag=%s\n",
995 rank_val_to_str(rreq->status.MPI_SOURCE, rank_buf, sizeof(rank_buf)),
996 tag_val_to_str(rreq->status.MPI_TAG, tag_buf, sizeof(tag_buf)));
997 ++i;
998 rreq = rreq->dev.next;
999 }
1000 fprintf(stream, "========================================\n");
1001 }
1002 /* --END ERROR HANDLING-- */
1003
1004 /* returns the number of elements in the unexpected queue */
1005 #undef FUNCNAME
1006 #define FUNCNAME MPIDI_CH3U_Recvq_count_unexp
1007 #undef FCNAME
1008 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPIDI_CH3U_Recvq_count_unexp(void)1009 int MPIDI_CH3U_Recvq_count_unexp(void)
1010 {
1011 int count = 0;
1012 MPID_Request *req = recvq_unexpected_head;
1013
1014 MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
1015
1016 while (req)
1017 {
1018 ++count;
1019 req = req->dev.next;
1020 }
1021
1022 return count;
1023 }
1024