1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #if (API_SET != API_SET_1) && (API_SET != API_SET_2)
7 #error Undefined API SET
8 #endif
9
10 /* ------------------------------------------------------------------------ */
11 /* peek_callback called when a successful peek is completed */
12 /* ------------------------------------------------------------------------ */
ADD_SUFFIX(peek_callback)13 static int ADD_SUFFIX(peek_callback) (cq_tagged_entry_t * wc, MPIR_Request * rreq) {
14 int mpi_errno = MPI_SUCCESS;
15 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_PEEK_CALLBACK);
16 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_PEEK_CALLBACK);
17 REQ_OFI(rreq)->match_state = PEEK_FOUND;
18 #if API_SET == API_SET_1
19 rreq->status.MPI_SOURCE = get_source(wc->tag);
20 #elif API_SET == API_SET_2
21 rreq->status.MPI_SOURCE = wc->data;
22 #endif
23 rreq->status.MPI_TAG = get_tag(wc->tag);
24 MPIR_STATUS_SET_COUNT(rreq->status, wc->len);
25 rreq->status.MPI_ERROR = MPI_SUCCESS;
26 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_PEEK_CALLBACK);
27 return mpi_errno;
28 }
29
ADD_SUFFIX(MPID_nem_ofi_iprobe_impl)30 int ADD_SUFFIX(MPID_nem_ofi_iprobe_impl) (struct MPIDI_VC * vc,
31 int source,
32 int tag,
33 MPIR_Comm * comm,
34 int context_offset,
35 int *flag, MPI_Status * status, MPIR_Request ** rreq_ptr)
36 {
37 int ret, mpi_errno = MPI_SUCCESS;
38 fi_addr_t remote_proc = 0;
39 uint64_t match_bits, mask_bits;
40 size_t len;
41 MPIR_Request rreq_s, *rreq;
42
43 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_IPROBE_IMPL);
44 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_IPROBE_IMPL);
45 if (rreq_ptr) {
46 MPIDI_CH3I_NM_OFI_RC(MPID_nem_ofi_create_req(&rreq, 1));
47 rreq->kind = MPIR_REQUEST_KIND__RECV;
48
49 *rreq_ptr = rreq;
50 rreq->comm = comm;
51 rreq->dev.match.parts.rank = source;
52 rreq->dev.match.parts.tag = tag;
53 rreq->dev.match.parts.context_id = comm->context_id;
54 MPIR_Comm_add_ref(comm);
55 } else {
56 rreq = &rreq_s;
57 rreq->dev.OnDataAvail = NULL;
58 }
59
60 REQ_OFI(rreq)->pack_buffer = NULL;
61 REQ_OFI(rreq)->event_callback = ADD_SUFFIX(peek_callback);
62 REQ_OFI(rreq)->match_state = PEEK_INIT;
63 OFI_ADDR_INIT(source, vc, remote_proc);
64 #if API_SET == API_SET_1
65 match_bits = init_recvtag(&mask_bits, comm->recvcontext_id + context_offset, source, tag);
66 #elif API_SET == API_SET_2
67 match_bits = init_recvtag_2(&mask_bits, comm->recvcontext_id + context_offset, tag);
68 #endif
69
70 /* ------------------------------------------------------------------------- */
71 /* fi_recvmsg with FI_PEEK: */
72 /* Initiate a search for a match in the hardware or software queue. */
73 /* The search can complete immediately with -ENOMSG. */
74 /* I successful, libfabric will enqueue a context entry into the completion */
75 /* queue to make the search nonblocking. This code will poll until the */
76 /* entry is enqueued. */
77 /* ------------------------------------------------------------------------- */
78 msg_tagged_t msg;
79 uint64_t msgflags = FI_PEEK;
80 msg.msg_iov = NULL;
81 msg.desc = NULL;
82 msg.iov_count = 0;
83 msg.addr = remote_proc;
84 msg.tag = match_bits;
85 msg.ignore = mask_bits;
86 msg.context = (void *) &(REQ_OFI(rreq)->ofi_context);
87 msg.data = 0;
88 if (*flag == CLAIM_PEEK)
89 msgflags |= FI_CLAIM;
90 ret = fi_trecvmsg(gl_data.endpoint, &msg, msgflags);
91 if (ret == -ENOMSG) {
92 if (rreq_ptr) {
93 MPIR_Request_free(rreq);
94 *rreq_ptr = NULL;
95 *flag = 0;
96 }
97 MPID_nem_ofi_poll(MPID_NONBLOCKING_POLL);
98 goto fn_exit;
99 }
100 MPIR_ERR_CHKANDJUMP4((ret < 0), mpi_errno, MPI_ERR_OTHER,
101 "**ofi_peek", "**ofi_peek %s %d %s %s",
102 __SHORT_FILE__, __LINE__, __func__, fi_strerror(-ret));
103
104 while (PEEK_INIT == REQ_OFI(rreq)->match_state)
105 MPID_nem_ofi_poll(MPID_BLOCKING_POLL);
106
107 if (PEEK_NOT_FOUND == REQ_OFI(rreq)->match_state) {
108 if (rreq_ptr) {
109 MPIR_Request_free(rreq);
110 *rreq_ptr = NULL;
111 *flag = 0;
112 }
113 MPID_nem_ofi_poll(MPID_NONBLOCKING_POLL);
114 goto fn_exit;
115 }
116
117 if (status != MPI_STATUS_IGNORE)
118 *status = rreq->status;
119
120 if (rreq_ptr)
121 MPIR_Request_add_ref(rreq);
122 *flag = 1;
123 fn_exit:
124 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_IPROBE_IMPL);
125 return mpi_errno;
126 fn_fail:
127 goto fn_exit;
128 }
129
130
ADD_SUFFIX(MPID_nem_ofi_iprobe)131 int ADD_SUFFIX(MPID_nem_ofi_iprobe) (struct MPIDI_VC * vc,
132 int source,
133 int tag,
134 MPIR_Comm * comm, int context_offset, int *flag,
135 MPI_Status * status) {
136 int rc;
137 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_IPROBE);
138 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_IPROBE);
139 *flag = 0;
140 rc = ADD_SUFFIX(MPID_nem_ofi_iprobe_impl) (vc, source,
141 tag, comm, context_offset, flag, status, NULL);
142 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_IPROBE);
143 return rc;
144 }
145
ADD_SUFFIX(MPID_nem_ofi_improbe)146 int ADD_SUFFIX(MPID_nem_ofi_improbe) (struct MPIDI_VC * vc,
147 int source,
148 int tag,
149 MPIR_Comm * comm,
150 int context_offset,
151 int *flag, MPIR_Request ** message, MPI_Status * status) {
152 int old_error;
153 int s;
154 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_IMPROBE);
155 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_IMPROBE);
156 *flag = CLAIM_PEEK;
157 if (status != MPI_STATUS_IGNORE) {
158 old_error = status->MPI_ERROR;
159 }
160 s = ADD_SUFFIX(MPID_nem_ofi_iprobe_impl) (vc, source,
161 tag, comm, context_offset, flag, status, message);
162 if (*flag) {
163 if (status != MPI_STATUS_IGNORE) {
164 status->MPI_ERROR = old_error;
165 }
166 (*message)->kind = MPIR_REQUEST_KIND__MPROBE;
167 }
168 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_IMPROBE);
169 return s;
170 }
171
ADD_SUFFIX(MPID_nem_ofi_anysource_iprobe)172 int ADD_SUFFIX(MPID_nem_ofi_anysource_iprobe) (int tag,
173 MPIR_Comm * comm,
174 int context_offset, int *flag, MPI_Status * status) {
175 int rc;
176 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IPROBE);
177 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IPROBE);
178 *flag = NORMAL_PEEK;
179 rc = ADD_SUFFIX(MPID_nem_ofi_iprobe) (NULL, MPI_ANY_SOURCE,
180 tag, comm, context_offset, flag, status);
181 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IPROBE);
182 return rc;
183 }
184
ADD_SUFFIX(MPID_nem_ofi_anysource_improbe)185 int ADD_SUFFIX(MPID_nem_ofi_anysource_improbe) (int tag,
186 MPIR_Comm * comm,
187 int context_offset,
188 int *flag, MPIR_Request ** message,
189 MPI_Status * status) {
190 int rc;
191 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IMPROBE);
192 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IMPROBE);
193 *flag = CLAIM_PEEK;
194 rc = ADD_SUFFIX(MPID_nem_ofi_improbe) (NULL, MPI_ANY_SOURCE, tag, comm,
195 context_offset, flag, message, status);
196 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IMPROBE);
197 return rc;
198 }
199