1 /*
2  * Copyright (c) 2013-2018 Intel, Inc. All rights reserved
3  * Copyright (c) 2017      Los Alamos National Security, LLC. All rights
4  *                         reserved.
5  * Copyright (c) 2019-2020 Triad National Security, LLC. All rights
6  *                         reserved.
7  * Copyright (c) 2018-2020 Amazon.com, Inc. or its affiliates. All rights
8  *                         reserved.
9  *
10  * $COPYRIGHT$
11  *
12  * Additional copyrights may follow
13  *
14  * $HEADER$
15  */
16 
17 #ifndef MTL_OFI_H_HAS_BEEN_INCLUDED
18 #define MTL_OFI_H_HAS_BEEN_INCLUDED
19 
20 #include "ompi/mca/mtl/mtl.h"
21 #include "ompi/mca/mtl/base/base.h"
22 #include "opal/datatype/opal_convertor.h"
23 #include "opal/util/show_help.h"
24 
25 #include <rdma/fabric.h>
26 #include <rdma/fi_cm.h>
27 #include <rdma/fi_domain.h>
28 #include <rdma/fi_endpoint.h>
29 #include <rdma/fi_errno.h>
30 #include <rdma/fi_tagged.h>
31 
32 #include "ompi_config.h"
33 #include "ompi/proc/proc.h"
34 #include "ompi/mca/mtl/mtl.h"
35 #include "opal/class/opal_list.h"
36 #include "ompi/communicator/communicator.h"
37 #include "opal/datatype/opal_convertor.h"
38 #include "ompi/mca/mtl/base/base.h"
39 #include "ompi/mca/mtl/base/mtl_base_datatype.h"
40 #include "ompi/message/message.h"
41 #include "opal/mca/common/ofi/common_ofi.h"
42 
43 #include "mtl_ofi_opt.h"
44 #include "mtl_ofi_types.h"
45 #include "mtl_ofi_request.h"
46 #include "mtl_ofi_endpoint.h"
47 #include "mtl_ofi_compat.h"
48 
49 BEGIN_C_DECLS
50 
51 extern mca_mtl_ofi_module_t ompi_mtl_ofi;
52 extern mca_base_framework_t ompi_mtl_base_framework;
53 
54 extern int ompi_mtl_ofi_add_procs(struct mca_mtl_base_module_t *mtl,
55                                   size_t nprocs,
56                                   struct ompi_proc_t** procs);
57 
58 extern int ompi_mtl_ofi_del_procs(struct mca_mtl_base_module_t *mtl,
59                                   size_t nprocs,
60                                   struct ompi_proc_t **procs);
61 
62 int ompi_mtl_ofi_progress_no_inline(void);
63 
64 #if OPAL_HAVE_THREAD_LOCAL
65 extern opal_thread_local int per_thread_ctx;
66 extern opal_thread_local struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
67 #endif
68 
69 /* Set OFI context for operations which generate completion events */
70 __opal_attribute_always_inline__ static inline void
set_thread_context(int ctxt)71 set_thread_context(int ctxt)
72 {
73 #if OPAL_HAVE_THREAD_LOCAL
74     per_thread_ctx = ctxt;
75     return;
76 #endif
77 }
78 
79 /* Retrieve OFI context to use for CQ poll */
80 __opal_attribute_always_inline__ static inline void
get_thread_context(int * ctxt)81 get_thread_context(int *ctxt)
82 {
83 #if OPAL_HAVE_THREAD_LOCAL
84     *ctxt = per_thread_ctx;
85 #endif
86     return;
87 }
88 
89 #define MTL_OFI_CONTEXT_LOCK(ctxt_id) \
90 OPAL_LIKELY(!opal_mutex_atomic_trylock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock))
91 
92 #define MTL_OFI_CONTEXT_UNLOCK(ctxt_id) \
93 opal_mutex_atomic_unlock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock)
94 
95 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_context_progress(int ctxt_id)96 ompi_mtl_ofi_context_progress(int ctxt_id)
97 {
98     int count = 0, i, events_read;
99     ompi_mtl_ofi_request_t *ofi_req = NULL;
100     struct fi_cq_err_entry error = { 0 };
101     ssize_t ret;
102 #if !OPAL_HAVE_THREAD_LOCAL
103     struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
104 #endif
105 
106     /**
107      * Read the work completions from the CQ.
108      * From the completion's op_context, we get the associated OFI request.
109      * Call the request's callback.
110      */
111     while (true) {
112         ret = fi_cq_read(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, (void *)&wc,
113                          ompi_mtl_ofi.ofi_progress_event_count);
114         if (ret > 0) {
115             count+= ret;
116             events_read = ret;
117             for (i = 0; i < events_read; i++) {
118                 if (NULL != wc[i].op_context) {
119                     ofi_req = TO_OFI_REQ(wc[i].op_context);
120                     assert(ofi_req);
121                     ret = ofi_req->event_callback(&wc[i], ofi_req);
122                     if (OMPI_SUCCESS != ret) {
123                         opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
124                                        "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
125                                        __FILE__, __LINE__, ret);
126                         fflush(stderr);
127                         exit(1);
128                     }
129                 }
130             }
131         } else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
132             /**
133              * An error occured and is being reported via the CQ.
134              * Read the error and forward it to the upper layer.
135              */
136             ret = fi_cq_readerr(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
137                                 &error,
138                                 0);
139             if (0 > ret) {
140                 /*
141                  * In multi-threaded scenarios, any thread that attempts to read
142                  * a CQ when there's a pending error CQ entry gets an
143                  * -FI_EAVAIL. Without any serialization here (which is okay,
144                  * since libfabric will protect access to critical CQ objects),
145                  * all threads proceed to read from the error CQ, but only one
146                  * thread fetches the entry while others get -FI_EAGAIN
147                  * indicating an empty queue, which is not erroneous.
148                  */
149                 if (ret == -FI_EAGAIN)
150                     return count;
151                 opal_output(0, "%s:%d: Error returned from fi_cq_readerr: %s(%zd).\n"
152                                "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
153                                __FILE__, __LINE__, fi_strerror(-ret), ret);
154                 fflush(stderr);
155                 exit(1);
156             }
157 
158             assert(error.op_context);
159             ofi_req = TO_OFI_REQ(error.op_context);
160             assert(ofi_req);
161             ret = ofi_req->error_callback(&error, ofi_req);
162             if (OMPI_SUCCESS != ret) {
163                     opal_output(0, "%s:%d: Error returned by request error callback: %zd.\n"
164                                    "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
165                                    __FILE__, __LINE__, ret);
166                 fflush(stderr);
167                 exit(1);
168             }
169         } else {
170             if (ret == -FI_EAGAIN || ret == -EINTR) {
171                 break;
172             } else {
173                 opal_output(0, "%s:%d: Error returned from fi_cq_read: %s(%zd).\n"
174                                "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
175                                __FILE__, __LINE__, fi_strerror(-ret), ret);
176                 fflush(stderr);
177                 exit(1);
178             }
179         }
180     }
181 
182     return count;
183 }
184 
185 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_progress(void)186 ompi_mtl_ofi_progress(void)
187 {
188     int count = 0, ctxt_id = 0, i;
189     static volatile uint32_t num_calls = 0;
190 
191     get_thread_context(&ctxt_id);
192 
193     if (ompi_mtl_ofi.mpi_thread_multiple) {
194         if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
195             count += ompi_mtl_ofi_context_progress(ctxt_id);
196             MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
197         }
198     } else {
199         count += ompi_mtl_ofi_context_progress(ctxt_id);
200     }
201 
202 #if OPAL_HAVE_THREAD_LOCAL
203     /*
204      * Try to progress other CQs in round-robin fashion.
205      * Progress is only made if no events were read from the CQ
206      * local to the calling thread past 16 times.
207      */
208     if (OPAL_UNLIKELY((count == 0) && ompi_mtl_ofi.mpi_thread_multiple &&
209         (((num_calls++) & 0xF) == 0 ))) {
210         for (i = 0; i < ompi_mtl_ofi.total_ctxts_used - 1; i++) {
211             ctxt_id = (ctxt_id + 1) % ompi_mtl_ofi.total_ctxts_used;
212 
213             if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
214                 count += ompi_mtl_ofi_context_progress(ctxt_id);
215                 MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
216             }
217 
218             /* Upon any work done, exit to let other threads take lock */
219             if (OPAL_LIKELY(count > 0)) {
220                 break;
221             }
222         }
223     }
224 #endif
225 
226     return count;
227 }
228 
229 /**
230  * When attempting to execute an OFI operation we need to handle
231  * resource overrun cases. When a call to an OFI OP fails with -FI_EAGAIN
232  * the OFI mtl will attempt to progress any pending Completion Queue
233  * events that may prevent additional operations to be enqueued.
234  * If the call to ofi progress is successful, then the function call
235  * will be retried.
236  */
237 #define MTL_OFI_RETRY_UNTIL_DONE(FUNC, RETURN)         \
238     do {                                               \
239         do {                                           \
240             RETURN = FUNC;                             \
241             if (OPAL_LIKELY(0 == RETURN)) {break;}     \
242             if (OPAL_LIKELY(RETURN == -FI_EAGAIN)) {   \
243                 ompi_mtl_ofi_progress();               \
244             }                                          \
245         } while (OPAL_LIKELY(-FI_EAGAIN == RETURN));   \
246     } while (0);
247 
248 #define MTL_OFI_LOG_FI_ERR(err, string)                                     \
249     do {                                                                    \
250         opal_output_verbose(1, opal_common_ofi.output,                      \
251                             "%s:%d:%s: %s\n",                               \
252                             __FILE__, __LINE__, string, fi_strerror(-err)); \
253     } while(0);
254 
255 /* MTL interface functions */
256 int ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl);
257 
258 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_get_error(int error_num)259 ompi_mtl_ofi_get_error(int error_num)
260 {
261     int ret;
262 
263     switch (error_num) {
264     case 0:
265         ret = OMPI_SUCCESS;
266         break;
267     default:
268         ret = OMPI_ERROR;
269     }
270 
271     return ret;
272 }
273 
274 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)275 ompi_mtl_ofi_send_callback(struct fi_cq_tagged_entry *wc,
276                            ompi_mtl_ofi_request_t *ofi_req)
277 {
278     assert(ofi_req->completion_count > 0);
279     ofi_req->completion_count--;
280     return OMPI_SUCCESS;
281 }
282 
283 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)284 ompi_mtl_ofi_send_error_callback(struct fi_cq_err_entry *error,
285                                  ompi_mtl_ofi_request_t *ofi_req)
286 {
287     switch(error->err) {
288         case FI_ETRUNC:
289             ofi_req->status.MPI_ERROR = MPI_ERR_TRUNCATE;
290             break;
291         default:
292             ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
293     }
294     return ofi_req->event_callback(NULL, ofi_req);
295 }
296 
297 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_ack_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)298 ompi_mtl_ofi_send_ack_callback(struct fi_cq_tagged_entry *wc,
299                                ompi_mtl_ofi_request_t *ofi_req)
300 {
301     ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
302 
303     free(ofi_req);
304 
305     parent_req->event_callback(NULL, parent_req);
306 
307     return OMPI_SUCCESS;
308 }
309 
310 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_ack_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)311 ompi_mtl_ofi_send_ack_error_callback(struct fi_cq_err_entry *error,
312                                      ompi_mtl_ofi_request_t *ofi_req)
313 {
314     ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
315 
316     free(ofi_req);
317 
318     parent_req->status.MPI_ERROR = MPI_ERR_INTERN;
319 
320     return parent_req->error_callback(error, parent_req);
321 }
322 
323 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)324 ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
325                             ompi_mtl_ofi_request_t *ofi_req)
326 {
327     assert(ofi_req->completion_count > 0);
328     ofi_req->completion_count--;
329 
330     if (0 == ofi_req->completion_count) {
331         /* Request completed */
332         if (OPAL_UNLIKELY(NULL != ofi_req->buffer)) {
333             free(ofi_req->buffer);
334             ofi_req->buffer = NULL;
335         }
336 
337         ofi_req->super.ompi_req->req_status.MPI_ERROR =
338             ofi_req->status.MPI_ERROR;
339 
340         ofi_req->super.completion_callback(&ofi_req->super);
341     }
342 
343     return OMPI_SUCCESS;
344 }
345 
346 /* Return OFI context ID associated with the specific communicator */
347 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id)348 ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id)
349 {
350     /* For non-thread-grouping use case, only one context is used which is
351      * associated to MPI_COMM_WORLD, so use that. */
352     if (0 == ompi_mtl_ofi.thread_grouping) {
353         comm_id = 0;
354     }
355 
356     return ompi_mtl_ofi.comm_to_context[comm_id];
357 }
358 
359 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t * ack_req,struct ompi_communicator_t * comm,fi_addr_t * src_addr,ompi_mtl_ofi_request_t * ofi_req,mca_mtl_ofi_endpoint_t * endpoint,uint64_t * match_bits,int tag)360 ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
361                   struct ompi_communicator_t *comm,
362                   fi_addr_t *src_addr,
363                   ompi_mtl_ofi_request_t *ofi_req,
364                   mca_mtl_ofi_endpoint_t *endpoint,
365                   uint64_t *match_bits,
366                   int tag)
367 {
368     ssize_t ret = OMPI_SUCCESS;
369     int ctxt_id = 0;
370 
371     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
372     set_thread_context(ctxt_id);
373 
374     ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
375     assert(ack_req);
376 
377     ack_req->parent = ofi_req;
378     ack_req->event_callback = ompi_mtl_ofi_send_ack_callback;
379     ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;
380 
381     ofi_req->completion_count += 1;
382 
383     MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
384                                       NULL,
385                                       0,
386                                       NULL,
387                                       *src_addr,
388                                       *match_bits | ompi_mtl_ofi.sync_send_ack,
389                                       0, /* Exact match, no ignore bits */
390                                       (void *) &ack_req->ctx), ret);
391     if (OPAL_UNLIKELY(0 > ret)) {
392         opal_output_verbose(1, opal_common_ofi.output,
393                             "%s:%d: fi_trecv failed: %s(%zd)",
394                             __FILE__, __LINE__, fi_strerror(-ret), ret);
395         free(ack_req);
396         return ompi_mtl_ofi_get_error(ret);
397     }
398 
399      /* The SYNC_SEND tag bit is set for the send operation only.*/
400     MTL_OFI_SET_SYNC_SEND(*match_bits);
401     return OMPI_SUCCESS;
402 }
403 
404 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode,bool ofi_cq_data)405 ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
406                   struct ompi_communicator_t *comm,
407                   int dest,
408                   int tag,
409                   struct opal_convertor_t *convertor,
410                   mca_pml_base_send_mode_t mode,
411                   bool ofi_cq_data)
412 {
413     ssize_t ret = OMPI_SUCCESS;
414     ompi_mtl_ofi_request_t ofi_req;
415     int ompi_ret, ctxt_id = 0;
416     void *start;
417     bool free_after;
418     size_t length;
419     uint64_t match_bits;
420     ompi_proc_t *ompi_proc = NULL;
421     mca_mtl_ofi_endpoint_t *endpoint = NULL;
422     ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
423     fi_addr_t src_addr = 0;
424     fi_addr_t sep_peer_fiaddr = 0;
425 
426     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
427     set_thread_context(ctxt_id);
428 
429     /**
430      * Create a send request, start it and wait until it completes.
431      */
432     ofi_req.event_callback = ompi_mtl_ofi_send_callback;
433     ofi_req.error_callback = ompi_mtl_ofi_send_error_callback;
434 
435     ompi_proc = ompi_comm_peer_lookup(comm, dest);
436     endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
437 
438     /* For Scalable Endpoints, gather target receive context */
439     sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
440 
441     ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
442     if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
443         return ompi_ret;
444     }
445 
446     ofi_req.buffer = (free_after) ? start : NULL;
447     ofi_req.length = length;
448     ofi_req.status.MPI_ERROR = OMPI_SUCCESS;
449     ofi_req.completion_count = 0;
450 
451     if (OPAL_UNLIKELY(length > endpoint->mtl_ofi_module->max_msg_size)) {
452         opal_show_help("help-mtl-ofi.txt",
453             "message too big", false,
454             length, endpoint->mtl_ofi_module->max_msg_size);
455         return OMPI_ERROR;
456     }
457 
458     if (ofi_cq_data) {
459         match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
460         src_addr = sep_peer_fiaddr;
461     } else {
462         match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
463                                              comm->c_my_rank, tag);
464         /* src_addr is ignored when FI_DIRECTED_RECV is not supported */
465     }
466 
467     if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
468         ofi_req.status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &src_addr,
469                                                            &ofi_req, endpoint,
470                                                            &match_bits, tag);
471         if (OPAL_UNLIKELY(ofi_req.status.MPI_ERROR != OMPI_SUCCESS))
472             goto free_request_buffer;
473     }
474 
475     if (ompi_mtl_ofi.max_inject_size >= length) {
476         if (ofi_cq_data) {
477             MTL_OFI_RETRY_UNTIL_DONE(fi_tinjectdata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
478                                             start,
479                                             length,
480                                             comm->c_my_rank,
481                                             sep_peer_fiaddr,
482                                             match_bits), ret);
483         } else {
484             MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
485                                             start,
486                                             length,
487                                             sep_peer_fiaddr,
488                                             match_bits), ret);
489         }
490         if (OPAL_UNLIKELY(0 > ret)) {
491             MTL_OFI_LOG_FI_ERR(ret,
492                                ofi_cq_data ? "fi_tinjectdata failed"
493                                : "fi_tinject failed");
494             if (ack_req) {
495                 fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, &ack_req->ctx);
496                 free(ack_req);
497             }
498 
499             ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
500             goto free_request_buffer;
501         }
502     } else {
503         ofi_req.completion_count += 1;
504         if (ofi_cq_data) {
505             MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
506                                           start,
507                                           length,
508                                           NULL,
509                                           comm->c_my_rank,
510                                           sep_peer_fiaddr,
511                                           match_bits,
512                                           (void *) &ofi_req.ctx), ret);
513         } else {
514             MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
515                                           start,
516                                           length,
517                                           NULL,
518                                           sep_peer_fiaddr,
519                                           match_bits,
520                                           (void *) &ofi_req.ctx), ret);
521         }
522         if (OPAL_UNLIKELY(0 > ret)) {
523             MTL_OFI_LOG_FI_ERR(ret,
524                                ofi_cq_data ? "fi_tsenddata failed"
525                                : "fi_tsend failed");
526             ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
527             goto free_request_buffer;
528         }
529     }
530 
531     /**
532      * Wait until the request is completed.
533      * ompi_mtl_ofi_send_callback() updates this variable.
534      */
535     while (0 < ofi_req.completion_count) {
536         ompi_mtl_ofi_progress();
537     }
538 
539 free_request_buffer:
540     if (OPAL_UNLIKELY(NULL != ofi_req.buffer)) {
541         free(ofi_req.buffer);
542     }
543 
544     return ofi_req.status.MPI_ERROR;
545 }
546 
547 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode,bool blocking,mca_mtl_request_t * mtl_request,bool ofi_cq_data)548 ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl,
549                    struct ompi_communicator_t *comm,
550                    int dest,
551                    int tag,
552                    struct opal_convertor_t *convertor,
553                    mca_pml_base_send_mode_t mode,
554                    bool blocking,
555                    mca_mtl_request_t *mtl_request,
556                    bool ofi_cq_data)
557 {
558     ssize_t ret = OMPI_SUCCESS;
559     ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request;
560     int ompi_ret, ctxt_id = 0;
561     void *start;
562     size_t length;
563     bool free_after;
564     uint64_t match_bits;
565     ompi_proc_t *ompi_proc = NULL;
566     mca_mtl_ofi_endpoint_t *endpoint = NULL;
567     ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
568     fi_addr_t sep_peer_fiaddr = 0;
569 
570     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
571     set_thread_context(ctxt_id);
572 
573     ofi_req->event_callback = ompi_mtl_ofi_isend_callback;
574     ofi_req->error_callback = ompi_mtl_ofi_send_error_callback;
575 
576     ompi_proc = ompi_comm_peer_lookup(comm, dest);
577     endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
578 
579     /* For Scalable Endpoints, gather target receive context */
580     sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
581 
582     ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
583     if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) return ompi_ret;
584 
585     ofi_req->buffer = (free_after) ? start : NULL;
586     ofi_req->length = length;
587     ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
588     ofi_req->completion_count = 1;
589 
590     if (OPAL_UNLIKELY(length > endpoint->mtl_ofi_module->max_msg_size)) {
591         opal_show_help("help-mtl-ofi.txt",
592             "message too big", false,
593             length, endpoint->mtl_ofi_module->max_msg_size);
594         return OMPI_ERROR;
595     }
596 
597     if (ofi_cq_data) {
598         match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
599     } else {
600         match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
601                           comm->c_my_rank, tag);
602         /* src_addr is ignored when FI_DIRECTED_RECV  is not supported */
603     }
604 
605     if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
606         ofi_req->status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &sep_peer_fiaddr,
607                                                            ofi_req, endpoint,
608                                                            &match_bits, tag);
609         if (OPAL_UNLIKELY(ofi_req->status.MPI_ERROR != OMPI_SUCCESS))
610             goto free_request_buffer;
611     }
612 
613     if (ofi_cq_data) {
614         MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
615                                       start,
616                                       length,
617                                       NULL,
618                                       comm->c_my_rank,
619                                       sep_peer_fiaddr,
620                                       match_bits,
621                                       (void *) &ofi_req->ctx), ret);
622     } else {
623         MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
624                                       start,
625                                       length,
626                                       NULL,
627                                       sep_peer_fiaddr,
628                                       match_bits,
629                                       (void *) &ofi_req->ctx), ret);
630     }
631     if (OPAL_UNLIKELY(0 > ret)) {
632         MTL_OFI_LOG_FI_ERR(ret,
633                            ofi_cq_data ? "fi_tsenddata failed"
634                            : "fi_tsend failed");
635         ofi_req->status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
636     }
637 
638 free_request_buffer:
639     if (OPAL_UNLIKELY(OMPI_SUCCESS != ofi_req->status.MPI_ERROR
640             && NULL != ofi_req->buffer)) {
641         free(ofi_req->buffer);
642     }
643 
644     return ofi_req->status.MPI_ERROR;
645 }
646 
647 /**
648  * Called when a completion for a posted recv is received.
649  */
650 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)651 ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
652                            ompi_mtl_ofi_request_t *ofi_req)
653 {
654     int ompi_ret, ctxt_id = 0;
655     ssize_t ret;
656     ompi_proc_t *ompi_proc = NULL;
657     mca_mtl_ofi_endpoint_t *endpoint = NULL;
658     int src = mtl_ofi_get_source(wc);
659     ompi_status_public_t *status = NULL;
660     struct fi_msg_tagged tagged_msg;
661 
662     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(ofi_req->comm->c_contextid);
663 
664     assert(ofi_req->super.ompi_req);
665     status = &ofi_req->super.ompi_req->req_status;
666 
667     /**
668      * Any event associated with a request starts it.
669      * This prevents a started request from being cancelled.
670      */
671     ofi_req->req_started = true;
672 
673     status->MPI_SOURCE = src;
674     status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
675     status->_ucount = wc->len;
676 
677     if (OPAL_UNLIKELY(wc->len > ofi_req->length)) {
678         opal_output_verbose(1, opal_common_ofi.output,
679                             "truncate expected: %ld %ld",
680                             wc->len, ofi_req->length);
681         status->MPI_ERROR = MPI_ERR_TRUNCATE;
682     }
683 
684     /**
685      * Unpack data into recv buffer if necessary.
686      */
687     if (OPAL_UNLIKELY(ofi_req->buffer)) {
688         ompi_ret = ompi_mtl_datatype_unpack(ofi_req->convertor,
689                                             ofi_req->buffer,
690                                             wc->len);
691         if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
692             opal_output_verbose(1, opal_common_ofi.output,
693                                 "%s:%d: ompi_mtl_datatype_unpack failed: %d",
694                                 __FILE__, __LINE__, ompi_ret);
695             status->MPI_ERROR = ompi_ret;
696         }
697     }
698 
699     /**
700     * We can only accept MTL_OFI_SYNC_SEND in the standard recv callback.
701     * MTL_OFI_SYNC_SEND_ACK should only be received in the send_ack
702     * callback.
703     */
704     assert(!MTL_OFI_IS_SYNC_SEND_ACK(wc->tag));
705 
706     /**
707      * If this recv is part of an MPI_Ssend operation, then we send an
708      * acknowledgment back to the sender.
709      * The ack message is sent without generating a completion event in
710      * the completion queue by not setting FI_COMPLETION in the flags to
711      * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
712      * This is done since the 0 byte message requires no
713      * notification on the send side for a successful completion.
714      * If a failure occurs the provider will notify the error
715      * in the cq_readerr during OFI progress. Once the message has been
716      * successfully processed the request is marked as completed.
717      */
718     if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
719         /**
720          * If the recv request was posted for any source,
721          * we need to extract the source's actual address.
722          */
723         if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
724             ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
725             endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
726             ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
727         }
728 
729         tagged_msg.msg_iov = NULL;
730         tagged_msg.desc = NULL;
731         tagged_msg.iov_count = 0;
732         tagged_msg.addr = ofi_req->remote_addr;
733         /**
734         * We must continue to use the user's original tag but remove the
735         * sync_send protocol tag bit and instead apply the sync_send_ack
736         * tag bit to complete the initator's sync send receive.
737         */
738         tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
739         tagged_msg.context = NULL;
740         tagged_msg.data = 0;
741 
742         MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
743                                  &tagged_msg, 0), ret);
744         if (OPAL_UNLIKELY(0 > ret)) {
745             MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed");
746             status->MPI_ERROR = OMPI_ERROR;
747         }
748     }
749 
750     ofi_req->super.completion_callback(&ofi_req->super);
751 
752     return OMPI_SUCCESS;
753 }
754 
755 /**
756  * Called when an error occured on a recv request.
757  */
758 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)759 ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry *error,
760                                  ompi_mtl_ofi_request_t *ofi_req)
761 {
762     ompi_status_public_t *status;
763     assert(ofi_req->super.ompi_req);
764     status = &ofi_req->super.ompi_req->req_status;
765     status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
766     status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error);
767 
768     switch (error->err) {
769         case FI_ETRUNC:
770             status->MPI_ERROR = MPI_ERR_TRUNCATE;
771             break;
772         case FI_ECANCELED:
773             status->_cancelled = true;
774             break;
775         default:
776             status->MPI_ERROR = MPI_ERR_INTERN;
777     }
778 
779     ofi_req->super.completion_callback(&ofi_req->super);
780     return OMPI_SUCCESS;
781 }
782 
783 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,struct opal_convertor_t * convertor,mca_mtl_request_t * mtl_request,bool ofi_cq_data)784 ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t *mtl,
785                    struct ompi_communicator_t *comm,
786                    int src,
787                    int tag,
788                    struct opal_convertor_t *convertor,
789                    mca_mtl_request_t *mtl_request,
790                    bool ofi_cq_data)
791 {
792     int ompi_ret = OMPI_SUCCESS, ctxt_id = 0;
793     ssize_t ret;
794     uint64_t match_bits, mask_bits;
795     fi_addr_t remote_addr = ompi_mtl_ofi.any_addr;
796     ompi_proc_t *ompi_proc = NULL;
797     mca_mtl_ofi_endpoint_t *endpoint = NULL;
798     ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
799     void *start;
800     size_t length;
801     bool free_after;
802 
803     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
804     set_thread_context(ctxt_id);
805 
806     if (ofi_cq_data) {
807         if (MPI_ANY_SOURCE != src) {
808             ompi_proc = ompi_comm_peer_lookup(comm, src);
809             endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
810             remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
811         }
812 
813         mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
814                                     tag);
815     } else {
816         mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
817                                 tag);
818         /* src_addr is ignored when FI_DIRECTED_RECV is not used */
819     }
820 
821     ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
822                                           &start,
823                                           &length,
824                                           &free_after);
825     if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
826         return ompi_ret;
827     }
828 
829     ofi_req->type = OMPI_MTL_OFI_RECV;
830     ofi_req->event_callback = ompi_mtl_ofi_recv_callback;
831     ofi_req->error_callback = ompi_mtl_ofi_recv_error_callback;
832     ofi_req->comm = comm;
833     ofi_req->buffer = (free_after) ? start : NULL;
834     ofi_req->length = length;
835     ofi_req->convertor = convertor;
836     ofi_req->req_started = false;
837     ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
838     ofi_req->remote_addr = remote_addr;
839     ofi_req->match_bits = match_bits;
840 
841     MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
842                                       start,
843                                       length,
844                                       NULL,
845                                       remote_addr,
846                                       match_bits,
847                                       mask_bits,
848                                       (void *)&ofi_req->ctx), ret);
849     if (OPAL_UNLIKELY(0 > ret)) {
850         if (NULL != ofi_req->buffer) {
851             free(ofi_req->buffer);
852         }
853         MTL_OFI_LOG_FI_ERR(ret, "fi_trecv failed");
854         return ompi_mtl_ofi_get_error(ret);
855     }
856 
857     return OMPI_SUCCESS;
858 }
859 
860 /**
861  * Called when a mrecv request completes.
862  */
863 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)864 ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
865                             ompi_mtl_ofi_request_t *ofi_req)
866 {
867     struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
868     ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
869     status->MPI_SOURCE = mtl_ofi_get_source(wc);
870     status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
871     status->MPI_ERROR = MPI_SUCCESS;
872     status->_ucount = wc->len;
873 
874     free(ofi_req);
875 
876     mrecv_req->completion_callback(mrecv_req);
877 
878     return OMPI_SUCCESS;
879 }
880 
881 /**
882  * Called when an error occured on a mrecv request.
883  */
884 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)885 ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry *error,
886                                   ompi_mtl_ofi_request_t *ofi_req)
887 {
888     struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
889     ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
890     status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
891     status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry  *) error);
892 
893     switch (error->err) {
894         case FI_ETRUNC:
895             status->MPI_ERROR = MPI_ERR_TRUNCATE;
896             break;
897         case FI_ECANCELED:
898             status->_cancelled = true;
899             break;
900         default:
901             status->MPI_ERROR = MPI_ERR_INTERN;
902     }
903 
904     free(ofi_req);
905 
906     mrecv_req->completion_callback(mrecv_req);
907 
908     return OMPI_SUCCESS;
909 }
910 
911 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t * mtl,struct opal_convertor_t * convertor,struct ompi_message_t ** message,struct mca_mtl_request_t * mtl_request)912 ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
913                     struct opal_convertor_t *convertor,
914                     struct ompi_message_t **message,
915                     struct mca_mtl_request_t *mtl_request)
916 {
917     ompi_mtl_ofi_request_t *ofi_req =
918         (ompi_mtl_ofi_request_t *)(*message)->req_ptr;
919     void *start;
920     size_t length;
921     bool free_after;
922     struct iovec iov;
923     struct fi_msg_tagged msg;
924     int ompi_ret, ctxt_id = 0;
925     ssize_t ret;
926     uint64_t msgflags = FI_CLAIM | FI_COMPLETION;
927     struct ompi_communicator_t *comm = (*message)->comm;
928 
929     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
930     set_thread_context(ctxt_id);
931 
932     ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
933                                           &start,
934                                           &length,
935                                           &free_after);
936     if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
937         return ompi_ret;
938     }
939 
940     ofi_req->type = OMPI_MTL_OFI_RECV;
941     ofi_req->event_callback = ompi_mtl_ofi_mrecv_callback;
942     ofi_req->error_callback = ompi_mtl_ofi_mrecv_error_callback;
943     ofi_req->buffer = (free_after) ? start : NULL;
944     ofi_req->length = length;
945     ofi_req->convertor = convertor;
946     ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
947     ofi_req->mrecv_req = mtl_request;
948 
949     /**
950      * fi_trecvmsg with FI_CLAIM
951      */
952     iov.iov_base = start;
953     iov.iov_len = length;
954     msg.msg_iov = &iov;
955     msg.desc = NULL;
956     msg.iov_count = 1;
957     msg.addr = 0;
958     msg.tag = ofi_req->match_bits;
959     msg.ignore = ofi_req->mask_bits;
960     msg.context = (void *)&ofi_req->ctx;
961     msg.data = 0;
962 
963     MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
964     if (OPAL_UNLIKELY(0 > ret)) {
965         MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
966         return ompi_mtl_ofi_get_error(ret);
967     }
968 
969     *message = MPI_MESSAGE_NULL;
970 
971     return OMPI_SUCCESS;
972 }
973 
974 /**
975  * Called when a probe request completes.
976  */
977 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)978 ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry *wc,
979                             ompi_mtl_ofi_request_t *ofi_req)
980 {
981     ofi_req->match_state = 1;
982     ofi_req->match_bits = wc->tag;
983     ofi_req->status.MPI_SOURCE = mtl_ofi_get_source(wc);
984     ofi_req->status.MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
985     ofi_req->status.MPI_ERROR = MPI_SUCCESS;
986     ofi_req->status._ucount = wc->len;
987     ofi_req->completion_count--;
988 
989     return OMPI_SUCCESS;
990 }
991 
992 /**
993  * Called when a probe request encounters an error.
994  */
995 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_probe_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)996 ompi_mtl_ofi_probe_error_callback(struct fi_cq_err_entry *error,
997                                   ompi_mtl_ofi_request_t *ofi_req)
998 {
999     ofi_req->completion_count--;
1000 
1001     /*
1002      * Receives posted with FI_PEEK and friends will get an error
1003      * completion with FI_ENOMSG. This just indicates the lack of a match for
1004      * the probe and is not an error case. All other error cases are
1005      * provider-internal errors and should be flagged as such.
1006      */
1007     if (error->err == FI_ENOMSG)
1008         return OMPI_SUCCESS;
1009 
1010     ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
1011 
1012     return OMPI_ERROR;
1013 }
1014 
1015 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * flag,struct ompi_status_public_t * status,bool ofi_cq_data)1016 ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t *mtl,
1017                     struct ompi_communicator_t *comm,
1018                     int src,
1019                     int tag,
1020                     int *flag,
1021                     struct ompi_status_public_t *status,
1022                     bool ofi_cq_data)
1023 {
1024     struct ompi_mtl_ofi_request_t ofi_req;
1025     ompi_proc_t *ompi_proc = NULL;
1026     mca_mtl_ofi_endpoint_t *endpoint = NULL;
1027     fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
1028     uint64_t match_bits, mask_bits;
1029     ssize_t ret;
1030     struct fi_msg_tagged msg;
1031     uint64_t msgflags = FI_PEEK | FI_COMPLETION;
1032     int ctxt_id = 0;
1033 
1034     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
1035     set_thread_context(ctxt_id);
1036 
1037     if (ofi_cq_data) {
1038      /* If the source is known, use its peer_fiaddr. */
1039         if (MPI_ANY_SOURCE != src) {
1040             ompi_proc = ompi_comm_peer_lookup( comm, src );
1041             endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
1042             remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1043         }
1044 
1045         mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
1046                                     tag);
1047     }
1048     else {
1049         mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
1050                                 tag);
1051         /* src_addr is ignored when FI_DIRECTED_RECV is not used */
1052     }
1053 
1054     /**
1055      * fi_trecvmsg with FI_PEEK:
1056      * Initiate a search for a match in the hardware or software queue.
1057      * If successful, libfabric will enqueue a context entry into the completion
1058      * queue to make the search nonblocking.  This code will poll until the
1059      * entry is enqueued.
1060      */
1061     msg.msg_iov = NULL;
1062     msg.desc = NULL;
1063     msg.iov_count = 0;
1064     msg.addr = remote_proc;
1065     msg.tag = match_bits;
1066     msg.ignore = mask_bits;
1067     msg.context = (void *)&ofi_req.ctx;
1068     msg.data = 0;
1069 
1070     ofi_req.type = OMPI_MTL_OFI_PROBE;
1071     ofi_req.event_callback = ompi_mtl_ofi_probe_callback;
1072     ofi_req.error_callback = ompi_mtl_ofi_probe_error_callback;
1073     ofi_req.completion_count = 1;
1074     ofi_req.match_state = 0;
1075 
1076     MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
1077     if (OPAL_UNLIKELY(0 > ret)) {
1078         MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
1079         return ompi_mtl_ofi_get_error(ret);
1080     }
1081 
1082     while (0 < ofi_req.completion_count) {
1083         opal_progress();
1084     }
1085 
1086     *flag = ofi_req.match_state;
1087     if (1 == *flag) {
1088         if (MPI_STATUS_IGNORE != status) {
1089             *status = ofi_req.status;
1090         }
1091     }
1092 
1093     return OMPI_SUCCESS;
1094 }
1095 
1096 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * matched,struct ompi_message_t ** message,struct ompi_status_public_t * status,bool ofi_cq_data)1097 ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t *mtl,
1098                      struct ompi_communicator_t *comm,
1099                      int src,
1100                      int tag,
1101                      int *matched,
1102                      struct ompi_message_t **message,
1103                      struct ompi_status_public_t *status,
1104                      bool ofi_cq_data)
1105 {
1106     struct ompi_mtl_ofi_request_t *ofi_req;
1107     ompi_proc_t *ompi_proc = NULL;
1108     mca_mtl_ofi_endpoint_t *endpoint = NULL;
1109     fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
1110     uint64_t match_bits, mask_bits;
1111     ssize_t ret;
1112     struct fi_msg_tagged msg;
1113     uint64_t msgflags = FI_PEEK | FI_CLAIM | FI_COMPLETION;
1114     int ctxt_id = 0;
1115 
1116     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
1117     set_thread_context(ctxt_id);
1118 
1119     ofi_req = malloc(sizeof *ofi_req);
1120     if (NULL == ofi_req) {
1121         return OMPI_ERROR;
1122     }
1123 
1124     /**
1125      * If the source is known, use its peer_fiaddr.
1126      */
1127 
1128     if (ofi_cq_data) {
1129         if (MPI_ANY_SOURCE != src) {
1130             ompi_proc = ompi_comm_peer_lookup( comm, src );
1131             endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
1132             remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1133         }
1134 
1135         mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
1136                                     tag);
1137     }
1138     else {
1139         /* src_addr is ignored when FI_DIRECTED_RECV is not used */
1140         mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
1141                                 tag);
1142     }
1143 
1144     /**
1145      * fi_trecvmsg with FI_PEEK and FI_CLAIM:
1146      * Initiate a search for a match in the hardware or software queue.
1147      * If successful, libfabric will enqueue a context entry into the completion
1148      * queue to make the search nonblocking.  This code will poll until the
1149      * entry is enqueued.
1150      */
1151     msg.msg_iov = NULL;
1152     msg.desc = NULL;
1153     msg.iov_count = 0;
1154     msg.addr = remote_proc;
1155     msg.tag = match_bits;
1156     msg.ignore = mask_bits;
1157     msg.context = (void *)&ofi_req->ctx;
1158     msg.data = 0;
1159 
1160     ofi_req->type = OMPI_MTL_OFI_PROBE;
1161     ofi_req->event_callback = ompi_mtl_ofi_probe_callback;
1162     ofi_req->error_callback = ompi_mtl_ofi_probe_error_callback;
1163     ofi_req->completion_count = 1;
1164     ofi_req->match_state = 0;
1165     ofi_req->mask_bits = mask_bits;
1166 
1167     MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
1168     if (OPAL_UNLIKELY(0 > ret)) {
1169         MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
1170         free(ofi_req);
1171         return ompi_mtl_ofi_get_error(ret);
1172     }
1173 
1174     while (0 < ofi_req->completion_count) {
1175         opal_progress();
1176     }
1177 
1178     *matched = ofi_req->match_state;
1179     if (1 == *matched) {
1180         if (MPI_STATUS_IGNORE != status) {
1181             *status = ofi_req->status;
1182         }
1183 
1184         (*message) = ompi_message_alloc();
1185         if (NULL == (*message)) {
1186             return OMPI_ERR_OUT_OF_RESOURCE;
1187         }
1188 
1189         (*message)->comm = comm;
1190         (*message)->req_ptr = ofi_req;
1191         (*message)->peer = ofi_req->status.MPI_SOURCE;
1192         (*message)->count = ofi_req->status._ucount;
1193 
1194     } else {
1195         (*message) = MPI_MESSAGE_NULL;
1196         free(ofi_req);
1197     }
1198 
1199     return OMPI_SUCCESS;
1200 }
1201 
1202 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t * mtl,mca_mtl_request_t * mtl_request,int flag)1203 ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
1204                     mca_mtl_request_t *mtl_request,
1205                     int flag)
1206 {
1207     int ret, ctxt_id = 0;
1208     ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
1209 
1210     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(ofi_req->comm->c_contextid);
1211 
1212     switch (ofi_req->type) {
1213         case OMPI_MTL_OFI_SEND:
1214             /**
1215              * Cannot cancel sends yet
1216              */
1217             break;
1218 
1219         case OMPI_MTL_OFI_RECV:
1220             /**
1221              * Cancel a receive request only if it hasn't been matched yet.
1222              * The event queue needs to be drained to make sure there isn't
1223              * any pending receive completion event.
1224              */
1225             ompi_mtl_ofi_progress();
1226 
1227             if (!ofi_req->req_started) {
1228                 ret = fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
1229                                &ofi_req->ctx);
1230                 if (0 == ret) {
1231                     if (ofi_req->req_started)
1232                         goto ofi_cancel_not_possible;
1233                 } else {
1234 ofi_cancel_not_possible:
1235                     /**
1236                      * Could not cancel the request.
1237                      */
1238                     ofi_req->super.ompi_req->req_status._cancelled = false;
1239                 }
1240             }
1241             break;
1242 
1243         default:
1244             return OMPI_ERROR;
1245     }
1246 
1247     return OMPI_SUCCESS;
1248 }
1249 
ompi_mtl_ofi_init_contexts(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,mca_mtl_ofi_ep_type ep_type)1250 static int ompi_mtl_ofi_init_contexts(struct mca_mtl_base_module_t *mtl,
1251                                       struct ompi_communicator_t *comm,
1252                                       mca_mtl_ofi_ep_type ep_type)
1253 {
1254     int ret;
1255     int ctxt_id = ompi_mtl_ofi.total_ctxts_used;
1256     struct fi_cq_attr cq_attr = {0};
1257     cq_attr.format = FI_CQ_FORMAT_TAGGED;
1258     cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
1259 
1260     if (OFI_REGULAR_EP == ep_type) {
1261         /*
1262          * For regular endpoints, just create the Lock object and register
1263          * progress function.
1264          */
1265         goto init_regular_ep;
1266     }
1267 
1268     /*
1269      * We only create upto Max number of contexts asked for by the user.
1270      * If user enables thread grouping feature and creates more number of
1271      * communicators than available contexts, then we set the threshold
1272      * context_id so that new communicators created beyond the threshold
1273      * will be assigned to contexts in a round-robin fashion.
1274      */
1275     if (ompi_mtl_ofi.num_ofi_contexts <= ompi_mtl_ofi.total_ctxts_used) {
1276         ompi_mtl_ofi.comm_to_context[comm->c_contextid] = comm->c_contextid %
1277                                                           ompi_mtl_ofi.total_ctxts_used;
1278         if (!ompi_mtl_ofi.threshold_comm_context_id) {
1279             ompi_mtl_ofi.threshold_comm_context_id = comm->c_contextid;
1280 
1281             opal_show_help("help-mtl-ofi.txt", "SEP thread grouping ctxt limit", true, ctxt_id,
1282                            ompi_process_info.nodename, __FILE__, __LINE__);
1283         }
1284 
1285         return OMPI_SUCCESS;
1286     }
1287 
1288     /* Init context info for Scalable EPs */
1289     ret = fi_tx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, NULL);
1290     if (ret) {
1291         MTL_OFI_LOG_FI_ERR(ret, "fi_tx_context failed");
1292         goto init_error;
1293     }
1294 
1295     ret = fi_rx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, NULL);
1296     if (ret) {
1297         MTL_OFI_LOG_FI_ERR(ret, "fi_rx_context failed");
1298         goto init_error;
1299     }
1300 
1301     ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, NULL);
1302     if (ret) {
1303         MTL_OFI_LOG_FI_ERR(ret, "fi_cq_open failed");
1304         goto init_error;
1305     }
1306 
1307     /* Bind CQ to TX/RX context object */
1308     ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
1309                      FI_TRANSMIT | FI_SELECTIVE_COMPLETION);
1310     if (0 != ret) {
1311         MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_TRANSMIT) failed");
1312         goto init_error;
1313     }
1314 
1315     ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
1316                      FI_RECV | FI_SELECTIVE_COMPLETION);
1317     if (0 != ret) {
1318         MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_RECV) failed");
1319         goto init_error;
1320     }
1321 
1322     /* Enable Endpoint for communication. This commits the bind operations */
1323     ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
1324     if (0 != ret) {
1325         MTL_OFI_LOG_FI_ERR(ret, "fi_enable (send context) failed");
1326         goto init_error;
1327     }
1328 
1329     ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
1330     if (0 != ret) {
1331         MTL_OFI_LOG_FI_ERR(ret, "fi_enable (recv context) failed");
1332         goto init_error;
1333     }
1334 
1335 init_regular_ep:
1336     /* Initialize per-context lock */
1337     OBJ_CONSTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock, opal_mutex_t);
1338 
1339     if (MPI_COMM_WORLD == comm) {
1340         ret = opal_progress_register(ompi_mtl_ofi_progress_no_inline);
1341         if (OMPI_SUCCESS != ret) {
1342             opal_output_verbose(1, opal_common_ofi.output,
1343                                 "%s:%d: opal_progress_register failed: %d\n",
1344                                 __FILE__, __LINE__, ret);
1345             goto init_error;
1346         }
1347     }
1348 
1349     ompi_mtl_ofi.comm_to_context[comm->c_contextid] = ompi_mtl_ofi.total_ctxts_used;
1350     ompi_mtl_ofi.total_ctxts_used++;
1351 
1352     return OMPI_SUCCESS;
1353 
1354 init_error:
1355     if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep) {
1356         (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
1357     }
1358 
1359     if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep) {
1360         (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
1361     }
1362 
1363     if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq) {
1364         (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq);
1365     }
1366 
1367     return ret;
1368 }
1369 
ompi_mtl_ofi_finalize_contexts(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,mca_mtl_ofi_ep_type ep_type)1370 static int ompi_mtl_ofi_finalize_contexts(struct mca_mtl_base_module_t *mtl,
1371                                           struct ompi_communicator_t *comm,
1372                                           mca_mtl_ofi_ep_type ep_type)
1373 {
1374     int ret = OMPI_SUCCESS, ctxt_id = 0;
1375 
1376     if (OFI_REGULAR_EP == ep_type) {
1377         /* For regular EPs, simply destruct Lock object and exit */
1378         goto finalize_regular_ep;
1379     }
1380 
1381     if (ompi_mtl_ofi.thread_grouping &&
1382         ompi_mtl_ofi.threshold_comm_context_id &&
1383         ((uint32_t) ompi_mtl_ofi.threshold_comm_context_id <= comm->c_contextid)) {
1384         return OMPI_SUCCESS;
1385     }
1386 
1387     ctxt_id = ompi_mtl_ofi.thread_grouping ?
1388            ompi_mtl_ofi.comm_to_context[comm->c_contextid] : 0;
1389 
1390     /*
1391      * For regular EPs, TX/RX contexts are aliased to SEP object which is
1392      * closed in ompi_mtl_ofi_finalize(). So, skip handling those here.
1393      */
1394     if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep))) {
1395         goto finalize_err;
1396     }
1397 
1398     if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep))) {
1399         goto finalize_err;
1400     }
1401 
1402     if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq))) {
1403         goto finalize_err;
1404     }
1405 
1406 finalize_regular_ep:
1407     /* Destroy context lock */
1408     OBJ_DESTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock);
1409 
1410     return OMPI_SUCCESS;
1411 
1412 finalize_err:
1413     opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
1414                    "fi_close",
1415                    ompi_process_info.nodename, __FILE__, __LINE__,
1416                    fi_strerror(-ret), ret);
1417 
1418     return OMPI_ERROR;
1419 }
1420 
1421 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm)1422 ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl,
1423                       struct ompi_communicator_t *comm)
1424 {
1425     int ret;
1426     mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ?
1427                                   OFI_REGULAR_EP : OFI_SCALABLE_EP;
1428 
1429     /*
1430      * If thread grouping enabled, add new OFI context for each communicator
1431      * other than MPI_COMM_SELF.
1432      */
1433     if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
1434         /* If no thread grouping, add new OFI context only
1435          * for MPI_COMM_WORLD.
1436          */
1437         (!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
1438 
1439         ret = ompi_mtl_ofi_init_contexts(mtl, comm, ep_type);
1440 
1441         if (OMPI_SUCCESS != ret) {
1442             goto error;
1443         }
1444     }
1445 
1446     return OMPI_SUCCESS;
1447 
1448 error:
1449     return OMPI_ERROR;
1450 }
1451 
1452 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm)1453 ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl,
1454                       struct ompi_communicator_t *comm)
1455 {
1456     int ret = OMPI_SUCCESS;
1457     mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ?
1458                                   OFI_REGULAR_EP : OFI_SCALABLE_EP;
1459 
1460     /*
1461      * Clean up OFI contexts information.
1462      */
1463     if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
1464         (!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
1465 
1466         ret = ompi_mtl_ofi_finalize_contexts(mtl, comm, ep_type);
1467     }
1468 
1469     return ret;
1470 }
1471 
1472 #ifdef MCA_ompi_mtl_DIRECT_CALL
1473 
1474 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode)1475 ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
1476                   struct ompi_communicator_t *comm,
1477                   int dest,
1478                   int tag,
1479                   struct opal_convertor_t *convertor,
1480                   mca_pml_base_send_mode_t mode)
1481 {
1482     return ompi_mtl_ofi_send_generic(mtl, comm, dest, tag,
1483                                     convertor, mode,
1484                                     ompi_mtl_ofi.fi_cq_data);
1485 }
1486 
1487 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_isend(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode,bool blocking,mca_mtl_request_t * mtl_request)1488 ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
1489                struct ompi_communicator_t *comm,
1490                int dest,
1491                int tag,
1492                struct opal_convertor_t *convertor,
1493                mca_pml_base_send_mode_t mode,
1494                bool blocking,
1495                mca_mtl_request_t *mtl_request)
1496 {
1497     return ompi_mtl_ofi_isend_generic(mtl, comm, dest, tag,
1498                                     convertor, mode, blocking, mtl_request,
1499                                     ompi_mtl_ofi.fi_cq_data);
1500 }
1501 
1502 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,struct opal_convertor_t * convertor,mca_mtl_request_t * mtl_request)1503 ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
1504                struct ompi_communicator_t *comm,
1505                int src,
1506                int tag,
1507                struct opal_convertor_t *convertor,
1508                mca_mtl_request_t *mtl_request)
1509 {
1510     return ompi_mtl_ofi_irecv_generic(mtl, comm, src, tag,
1511                                     convertor, mtl_request,
1512                                     ompi_mtl_ofi.fi_cq_data);
1513 }
1514 
1515 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * flag,struct ompi_status_public_t * status)1516 ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
1517                 struct ompi_communicator_t *comm,
1518                 int src,
1519                 int tag,
1520                 int *flag,
1521                 struct ompi_status_public_t *status)
1522 {
1523     return ompi_mtl_ofi_iprobe_generic(mtl, comm, src, tag,
1524                                     flag, status,
1525                                     ompi_mtl_ofi.fi_cq_data);
1526 }
1527 
1528 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * matched,struct ompi_message_t ** message,struct ompi_status_public_t * status)1529 ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
1530                  struct ompi_communicator_t *comm,
1531                  int src,
1532                  int tag,
1533                  int *matched,
1534                  struct ompi_message_t **message,
1535                  struct ompi_status_public_t *status)
1536 {
1537     return ompi_mtl_ofi_improbe_generic(mtl, comm, src, tag,
1538                                     matched, message, status,
1539                                     ompi_mtl_ofi.fi_cq_data);
1540 }
1541 #endif
1542 
1543 END_C_DECLS
1544 
1545 #endif  /* MTL_OFI_H_HAS_BEEN_INCLUDED */
1546