1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #include "mpidimpl.h"
7
MPID_Mprobe(int source,int tag,MPIR_Comm * comm,int context_offset,MPIR_Request ** message,MPI_Status * status)8 int MPID_Mprobe(int source, int tag, MPIR_Comm *comm, int context_offset,
9 MPIR_Request **message, MPI_Status *status)
10 {
11 int mpi_errno = MPI_SUCCESS;
12 MPID_Progress_state progress_state;
13 int found = FALSE;
14 int context_id = comm->recvcontext_id + context_offset;
15
16 *message = NULL;
17
18 /* Check to make sure the communicator hasn't already been revoked */
19 if (comm->revoked) {
20 MPIR_ERR_SETANDJUMP(mpi_errno,MPIX_ERR_REVOKED,"**revoked");
21 }
22
23 #ifdef ENABLE_COMM_OVERRIDES
24 if (MPIDI_Anysource_improbe_fn) {
25 if (source == MPI_ANY_SOURCE) {
26 /* if it's anysource, loop while checking the shm recv
27 queue and improbing the netmod, then do a progress
28 test to make some progress. */
29 do {
30 MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
31 *message = MPIDI_CH3U_Recvq_FDU_matchonly(source, tag, context_id, comm,&found);
32 MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
33 if (found) goto fn_exit;
34
35 mpi_errno = MPIDI_Anysource_improbe_fn(tag, comm, context_offset, &found, message, status);
36 MPIR_ERR_CHECK(mpi_errno);
37 if (found) goto fn_exit;
38
39 MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
40
41 /* FIXME could this be replaced with a progress_wait? */
42 mpi_errno = MPIDI_CH3_Progress_test();
43 MPIR_ERR_CHECK(mpi_errno);
44 } while (1);
45 }
46 else {
47 /* it's not anysource, see if this is for the netmod */
48 MPIDI_VC_t * vc;
49 MPIDI_Comm_get_vc_set_active(comm, source, &vc);
50
51 if (vc->comm_ops && vc->comm_ops->improbe) {
52 /* netmod has overridden improbe */
53 do {
54 mpi_errno = vc->comm_ops->improbe(vc, source, tag, comm, context_offset, &found,
55 message, status);
56 MPIR_ERR_CHECK(mpi_errno);
57 if (found) goto fn_exit;
58
59 MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
60
61 /* FIXME could this be replaced with a progress_wait? */
62 mpi_errno = MPIDI_CH3_Progress_test();
63 MPIR_ERR_CHECK(mpi_errno);
64 } while (1);
65 }
66 /* fall-through to shm case */
67 }
68 }
69 #endif
70 /* Inefficient implementation: we poll the unexpected queue looking for a
71 * matching request, interleaved with calls to progress. If there are many
72 * non-matching unexpected messages in the queue then we will end up
73 * needlessly scanning the UQ.
74 *
75 * A smarter implementation would enqueue a partial request (one lacking the
76 * recv buffer triple) onto the PQ. Unfortunately, this is a lot harder to
77 * do than it seems at first because of the spread-out nature of callers to
78 * various CH3U_Recvq routines and especially because of the enqueue/dequeue
79 * hooks for native MX tag matching support. */
80 MPIDI_CH3_Progress_start(&progress_state);
81 do
82 {
83 MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
84 *message = MPIDI_CH3U_Recvq_FDU_matchonly(source, tag, context_id, comm, &found);
85 MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
86 if (found)
87 break;
88
89 mpi_errno = MPIDI_CH3_Progress_wait(&progress_state);
90 }
91 while(mpi_errno == MPI_SUCCESS);
92 MPIDI_CH3_Progress_end(&progress_state);
93 MPIR_ERR_CHECK(mpi_errno);
94
95 if (*message) {
96 (*message)->kind = MPIR_REQUEST_KIND__MPROBE;
97 MPIR_Request_extract_status((*message), status);
98 }
99
100 fn_exit:
101 return mpi_errno;
102 fn_fail:
103 goto fn_exit;
104 }
105
106