1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #if (API_SET != API_SET_1) && (API_SET != API_SET_2)
7 #error Undefined API SET
8 #endif
9 
10 /* ------------------------------------------------------------------------ */
11 /* peek_callback called when a successful peek is completed                 */
12 /* ------------------------------------------------------------------------ */
ADD_SUFFIX(peek_callback)13 static int ADD_SUFFIX(peek_callback) (cq_tagged_entry_t * wc, MPIR_Request * rreq) {
14     int mpi_errno = MPI_SUCCESS;
15     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_PEEK_CALLBACK);
16     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_PEEK_CALLBACK);
17     REQ_OFI(rreq)->match_state = PEEK_FOUND;
18 #if API_SET == API_SET_1
19     rreq->status.MPI_SOURCE = get_source(wc->tag);
20 #elif API_SET == API_SET_2
21     rreq->status.MPI_SOURCE = wc->data;
22 #endif
23     rreq->status.MPI_TAG = get_tag(wc->tag);
24     MPIR_STATUS_SET_COUNT(rreq->status, wc->len);
25     rreq->status.MPI_ERROR = MPI_SUCCESS;
26     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_PEEK_CALLBACK);
27     return mpi_errno;
28 }
29 
ADD_SUFFIX(MPID_nem_ofi_iprobe_impl)30 int ADD_SUFFIX(MPID_nem_ofi_iprobe_impl) (struct MPIDI_VC * vc,
31                                           int source,
32                                           int tag,
33                                           MPIR_Comm * comm,
34                                           int context_offset,
35                                           int *flag, MPI_Status * status, MPIR_Request ** rreq_ptr)
36 {
37     int ret, mpi_errno = MPI_SUCCESS;
38     fi_addr_t remote_proc = 0;
39     uint64_t match_bits, mask_bits;
40     size_t len;
41     MPIR_Request rreq_s, *rreq;
42 
43     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_IPROBE_IMPL);
44     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_IPROBE_IMPL);
45     if (rreq_ptr) {
46         MPIDI_CH3I_NM_OFI_RC(MPID_nem_ofi_create_req(&rreq, 1));
47         rreq->kind = MPIR_REQUEST_KIND__RECV;
48 
49         *rreq_ptr = rreq;
50         rreq->comm = comm;
51         rreq->dev.match.parts.rank = source;
52         rreq->dev.match.parts.tag = tag;
53         rreq->dev.match.parts.context_id = comm->context_id;
54         MPIR_Comm_add_ref(comm);
55     } else {
56         rreq = &rreq_s;
57         rreq->dev.OnDataAvail = NULL;
58     }
59 
60     REQ_OFI(rreq)->pack_buffer = NULL;
61     REQ_OFI(rreq)->event_callback = ADD_SUFFIX(peek_callback);
62     REQ_OFI(rreq)->match_state = PEEK_INIT;
63     OFI_ADDR_INIT(source, vc, remote_proc);
64 #if API_SET == API_SET_1
65     match_bits = init_recvtag(&mask_bits, comm->recvcontext_id + context_offset, source, tag);
66 #elif API_SET == API_SET_2
67     match_bits = init_recvtag_2(&mask_bits, comm->recvcontext_id + context_offset, tag);
68 #endif
69 
70     /* ------------------------------------------------------------------------- */
71     /* fi_recvmsg with FI_PEEK:                                                  */
72     /* Initiate a search for a match in the hardware or software queue.          */
73     /* The search can complete immediately with -ENOMSG.                         */
74     /* I successful, libfabric will enqueue a context entry into the completion  */
75     /* queue to make the search nonblocking.  This code will poll until the      */
76     /* entry is enqueued.                                                        */
77     /* ------------------------------------------------------------------------- */
78     msg_tagged_t msg;
79     uint64_t msgflags = FI_PEEK;
80     msg.msg_iov = NULL;
81     msg.desc = NULL;
82     msg.iov_count = 0;
83     msg.addr = remote_proc;
84     msg.tag = match_bits;
85     msg.ignore = mask_bits;
86     msg.context = (void *) &(REQ_OFI(rreq)->ofi_context);
87     msg.data = 0;
88     if (*flag == CLAIM_PEEK)
89         msgflags |= FI_CLAIM;
90     ret = fi_trecvmsg(gl_data.endpoint, &msg, msgflags);
91     if (ret == -ENOMSG) {
92         if (rreq_ptr) {
93             MPIR_Request_free(rreq);
94             *rreq_ptr = NULL;
95             *flag = 0;
96         }
97         MPID_nem_ofi_poll(MPID_NONBLOCKING_POLL);
98         goto fn_exit;
99     }
100     MPIR_ERR_CHKANDJUMP4((ret < 0), mpi_errno, MPI_ERR_OTHER,
101                          "**ofi_peek", "**ofi_peek %s %d %s %s",
102                          __SHORT_FILE__, __LINE__, __func__, fi_strerror(-ret));
103 
104     while (PEEK_INIT == REQ_OFI(rreq)->match_state)
105         MPID_nem_ofi_poll(MPID_BLOCKING_POLL);
106 
107     if (PEEK_NOT_FOUND == REQ_OFI(rreq)->match_state) {
108         if (rreq_ptr) {
109             MPIR_Request_free(rreq);
110             *rreq_ptr = NULL;
111             *flag = 0;
112         }
113         MPID_nem_ofi_poll(MPID_NONBLOCKING_POLL);
114         goto fn_exit;
115     }
116 
117     if (status != MPI_STATUS_IGNORE)
118         *status = rreq->status;
119 
120     if (rreq_ptr)
121         MPIR_Request_add_ref(rreq);
122     *flag = 1;
123   fn_exit:
124     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_IPROBE_IMPL);
125     return mpi_errno;
126   fn_fail:
127     goto fn_exit;
128 }
129 
130 
ADD_SUFFIX(MPID_nem_ofi_iprobe)131 int ADD_SUFFIX(MPID_nem_ofi_iprobe) (struct MPIDI_VC * vc,
132                                      int source,
133                                      int tag,
134                                      MPIR_Comm * comm, int context_offset, int *flag,
135                                      MPI_Status * status) {
136     int rc;
137     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_IPROBE);
138     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_IPROBE);
139     *flag = 0;
140     rc = ADD_SUFFIX(MPID_nem_ofi_iprobe_impl) (vc, source,
141                                                tag, comm, context_offset, flag, status, NULL);
142     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_IPROBE);
143     return rc;
144 }
145 
ADD_SUFFIX(MPID_nem_ofi_improbe)146 int ADD_SUFFIX(MPID_nem_ofi_improbe) (struct MPIDI_VC * vc,
147                                       int source,
148                                       int tag,
149                                       MPIR_Comm * comm,
150                                       int context_offset,
151                                       int *flag, MPIR_Request ** message, MPI_Status * status) {
152     int old_error;
153     int s;
154     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_IMPROBE);
155     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_IMPROBE);
156     *flag = CLAIM_PEEK;
157     if (status != MPI_STATUS_IGNORE) {
158         old_error = status->MPI_ERROR;
159     }
160     s = ADD_SUFFIX(MPID_nem_ofi_iprobe_impl) (vc, source,
161                                               tag, comm, context_offset, flag, status, message);
162     if (*flag) {
163         if (status != MPI_STATUS_IGNORE) {
164             status->MPI_ERROR = old_error;
165         }
166         (*message)->kind = MPIR_REQUEST_KIND__MPROBE;
167     }
168     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_IMPROBE);
169     return s;
170 }
171 
ADD_SUFFIX(MPID_nem_ofi_anysource_iprobe)172 int ADD_SUFFIX(MPID_nem_ofi_anysource_iprobe) (int tag,
173                                                MPIR_Comm * comm,
174                                                int context_offset, int *flag, MPI_Status * status) {
175     int rc;
176     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IPROBE);
177     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IPROBE);
178     *flag = NORMAL_PEEK;
179     rc = ADD_SUFFIX(MPID_nem_ofi_iprobe) (NULL, MPI_ANY_SOURCE,
180                                           tag, comm, context_offset, flag, status);
181     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IPROBE);
182     return rc;
183 }
184 
ADD_SUFFIX(MPID_nem_ofi_anysource_improbe)185 int ADD_SUFFIX(MPID_nem_ofi_anysource_improbe) (int tag,
186                                                 MPIR_Comm * comm,
187                                                 int context_offset,
188                                                 int *flag, MPIR_Request ** message,
189                                                 MPI_Status * status) {
190     int rc;
191     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IMPROBE);
192     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IMPROBE);
193     *flag = CLAIM_PEEK;
194     rc = ADD_SUFFIX(MPID_nem_ofi_improbe) (NULL, MPI_ANY_SOURCE, tag, comm,
195                                            context_offset, flag, message, status);
196     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_OFI_ANYSOURCE_IMPROBE);
197     return rc;
198 }
199