1 /*
2 * Copyright (c) 2013-2018 Intel, Inc. All rights reserved
3 * Copyright (c) 2017 Los Alamos National Security, LLC. All rights
4 * reserved.
5 * Copyright (c) 2019-2020 Triad National Security, LLC. All rights
6 * reserved.
7 * Copyright (c) 2018-2020 Amazon.com, Inc. or its affiliates. All rights
8 * reserved.
9 *
10 * $COPYRIGHT$
11 *
12 * Additional copyrights may follow
13 *
14 * $HEADER$
15 */
16
17 #ifndef MTL_OFI_H_HAS_BEEN_INCLUDED
18 #define MTL_OFI_H_HAS_BEEN_INCLUDED
19
20 #include "ompi/mca/mtl/mtl.h"
21 #include "ompi/mca/mtl/base/base.h"
22 #include "opal/datatype/opal_convertor.h"
23 #include "opal/util/show_help.h"
24
25 #include <rdma/fabric.h>
26 #include <rdma/fi_cm.h>
27 #include <rdma/fi_domain.h>
28 #include <rdma/fi_endpoint.h>
29 #include <rdma/fi_errno.h>
30 #include <rdma/fi_tagged.h>
31
32 #include "ompi_config.h"
33 #include "ompi/proc/proc.h"
34 #include "ompi/mca/mtl/mtl.h"
35 #include "opal/class/opal_list.h"
36 #include "ompi/communicator/communicator.h"
37 #include "opal/datatype/opal_convertor.h"
38 #include "ompi/mca/mtl/base/base.h"
39 #include "ompi/mca/mtl/base/mtl_base_datatype.h"
40 #include "ompi/message/message.h"
41 #include "opal/mca/common/ofi/common_ofi.h"
42
43 #include "mtl_ofi_opt.h"
44 #include "mtl_ofi_types.h"
45 #include "mtl_ofi_request.h"
46 #include "mtl_ofi_endpoint.h"
47 #include "mtl_ofi_compat.h"
48
49 BEGIN_C_DECLS
50
51 extern mca_mtl_ofi_module_t ompi_mtl_ofi;
52 extern mca_base_framework_t ompi_mtl_base_framework;
53
54 extern int ompi_mtl_ofi_add_procs(struct mca_mtl_base_module_t *mtl,
55 size_t nprocs,
56 struct ompi_proc_t** procs);
57
58 extern int ompi_mtl_ofi_del_procs(struct mca_mtl_base_module_t *mtl,
59 size_t nprocs,
60 struct ompi_proc_t **procs);
61
62 int ompi_mtl_ofi_progress_no_inline(void);
63
64 #if OPAL_HAVE_THREAD_LOCAL
65 extern opal_thread_local int per_thread_ctx;
66 extern opal_thread_local struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
67 #endif
68
69 /* Set OFI context for operations which generate completion events */
70 __opal_attribute_always_inline__ static inline void
set_thread_context(int ctxt)71 set_thread_context(int ctxt)
72 {
73 #if OPAL_HAVE_THREAD_LOCAL
74 per_thread_ctx = ctxt;
75 return;
76 #endif
77 }
78
79 /* Retrieve OFI context to use for CQ poll */
80 __opal_attribute_always_inline__ static inline void
get_thread_context(int * ctxt)81 get_thread_context(int *ctxt)
82 {
83 #if OPAL_HAVE_THREAD_LOCAL
84 *ctxt = per_thread_ctx;
85 #endif
86 return;
87 }
88
89 #define MTL_OFI_CONTEXT_LOCK(ctxt_id) \
90 OPAL_LIKELY(!opal_mutex_atomic_trylock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock))
91
92 #define MTL_OFI_CONTEXT_UNLOCK(ctxt_id) \
93 opal_mutex_atomic_unlock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock)
94
95 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_context_progress(int ctxt_id)96 ompi_mtl_ofi_context_progress(int ctxt_id)
97 {
98 int count = 0, i, events_read;
99 ompi_mtl_ofi_request_t *ofi_req = NULL;
100 struct fi_cq_err_entry error = { 0 };
101 ssize_t ret;
102 #if !OPAL_HAVE_THREAD_LOCAL
103 struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
104 #endif
105
106 /**
107 * Read the work completions from the CQ.
108 * From the completion's op_context, we get the associated OFI request.
109 * Call the request's callback.
110 */
111 while (true) {
112 ret = fi_cq_read(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, (void *)&wc,
113 ompi_mtl_ofi.ofi_progress_event_count);
114 if (ret > 0) {
115 count+= ret;
116 events_read = ret;
117 for (i = 0; i < events_read; i++) {
118 if (NULL != wc[i].op_context) {
119 ofi_req = TO_OFI_REQ(wc[i].op_context);
120 assert(ofi_req);
121 ret = ofi_req->event_callback(&wc[i], ofi_req);
122 if (OMPI_SUCCESS != ret) {
123 opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
124 "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
125 __FILE__, __LINE__, ret);
126 fflush(stderr);
127 exit(1);
128 }
129 }
130 }
131 } else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
132 /**
133 * An error occured and is being reported via the CQ.
134 * Read the error and forward it to the upper layer.
135 */
136 ret = fi_cq_readerr(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
137 &error,
138 0);
139 if (0 > ret) {
140 /*
141 * In multi-threaded scenarios, any thread that attempts to read
142 * a CQ when there's a pending error CQ entry gets an
143 * -FI_EAVAIL. Without any serialization here (which is okay,
144 * since libfabric will protect access to critical CQ objects),
145 * all threads proceed to read from the error CQ, but only one
146 * thread fetches the entry while others get -FI_EAGAIN
147 * indicating an empty queue, which is not erroneous.
148 */
149 if (ret == -FI_EAGAIN)
150 return count;
151 opal_output(0, "%s:%d: Error returned from fi_cq_readerr: %s(%zd).\n"
152 "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
153 __FILE__, __LINE__, fi_strerror(-ret), ret);
154 fflush(stderr);
155 exit(1);
156 }
157
158 assert(error.op_context);
159 ofi_req = TO_OFI_REQ(error.op_context);
160 assert(ofi_req);
161 ret = ofi_req->error_callback(&error, ofi_req);
162 if (OMPI_SUCCESS != ret) {
163 opal_output(0, "%s:%d: Error returned by request error callback: %zd.\n"
164 "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
165 __FILE__, __LINE__, ret);
166 fflush(stderr);
167 exit(1);
168 }
169 } else {
170 if (ret == -FI_EAGAIN || ret == -EINTR) {
171 break;
172 } else {
173 opal_output(0, "%s:%d: Error returned from fi_cq_read: %s(%zd).\n"
174 "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
175 __FILE__, __LINE__, fi_strerror(-ret), ret);
176 fflush(stderr);
177 exit(1);
178 }
179 }
180 }
181
182 return count;
183 }
184
185 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_progress(void)186 ompi_mtl_ofi_progress(void)
187 {
188 int count = 0, ctxt_id = 0, i;
189 static volatile uint32_t num_calls = 0;
190
191 get_thread_context(&ctxt_id);
192
193 if (ompi_mtl_ofi.mpi_thread_multiple) {
194 if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
195 count += ompi_mtl_ofi_context_progress(ctxt_id);
196 MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
197 }
198 } else {
199 count += ompi_mtl_ofi_context_progress(ctxt_id);
200 }
201
202 #if OPAL_HAVE_THREAD_LOCAL
203 /*
204 * Try to progress other CQs in round-robin fashion.
205 * Progress is only made if no events were read from the CQ
206 * local to the calling thread past 16 times.
207 */
208 if (OPAL_UNLIKELY((count == 0) && ompi_mtl_ofi.mpi_thread_multiple &&
209 (((num_calls++) & 0xF) == 0 ))) {
210 for (i = 0; i < ompi_mtl_ofi.total_ctxts_used - 1; i++) {
211 ctxt_id = (ctxt_id + 1) % ompi_mtl_ofi.total_ctxts_used;
212
213 if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
214 count += ompi_mtl_ofi_context_progress(ctxt_id);
215 MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
216 }
217
218 /* Upon any work done, exit to let other threads take lock */
219 if (OPAL_LIKELY(count > 0)) {
220 break;
221 }
222 }
223 }
224 #endif
225
226 return count;
227 }
228
229 /**
230 * When attempting to execute an OFI operation we need to handle
231 * resource overrun cases. When a call to an OFI OP fails with -FI_EAGAIN
232 * the OFI mtl will attempt to progress any pending Completion Queue
233 * events that may prevent additional operations to be enqueued.
234 * If the call to ofi progress is successful, then the function call
235 * will be retried.
236 */
237 #define MTL_OFI_RETRY_UNTIL_DONE(FUNC, RETURN) \
238 do { \
239 do { \
240 RETURN = FUNC; \
241 if (OPAL_LIKELY(0 == RETURN)) {break;} \
242 if (OPAL_LIKELY(RETURN == -FI_EAGAIN)) { \
243 ompi_mtl_ofi_progress(); \
244 } \
245 } while (OPAL_LIKELY(-FI_EAGAIN == RETURN)); \
246 } while (0);
247
248 #define MTL_OFI_LOG_FI_ERR(err, string) \
249 do { \
250 opal_output_verbose(1, opal_common_ofi.output, \
251 "%s:%d:%s: %s\n", \
252 __FILE__, __LINE__, string, fi_strerror(-err)); \
253 } while(0);
254
255 /* MTL interface functions */
256 int ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl);
257
258 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_get_error(int error_num)259 ompi_mtl_ofi_get_error(int error_num)
260 {
261 int ret;
262
263 switch (error_num) {
264 case 0:
265 ret = OMPI_SUCCESS;
266 break;
267 default:
268 ret = OMPI_ERROR;
269 }
270
271 return ret;
272 }
273
274 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)275 ompi_mtl_ofi_send_callback(struct fi_cq_tagged_entry *wc,
276 ompi_mtl_ofi_request_t *ofi_req)
277 {
278 assert(ofi_req->completion_count > 0);
279 ofi_req->completion_count--;
280 return OMPI_SUCCESS;
281 }
282
283 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)284 ompi_mtl_ofi_send_error_callback(struct fi_cq_err_entry *error,
285 ompi_mtl_ofi_request_t *ofi_req)
286 {
287 switch(error->err) {
288 case FI_ETRUNC:
289 ofi_req->status.MPI_ERROR = MPI_ERR_TRUNCATE;
290 break;
291 default:
292 ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
293 }
294 return ofi_req->event_callback(NULL, ofi_req);
295 }
296
297 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_ack_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)298 ompi_mtl_ofi_send_ack_callback(struct fi_cq_tagged_entry *wc,
299 ompi_mtl_ofi_request_t *ofi_req)
300 {
301 ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
302
303 free(ofi_req);
304
305 parent_req->event_callback(NULL, parent_req);
306
307 return OMPI_SUCCESS;
308 }
309
310 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_ack_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)311 ompi_mtl_ofi_send_ack_error_callback(struct fi_cq_err_entry *error,
312 ompi_mtl_ofi_request_t *ofi_req)
313 {
314 ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
315
316 free(ofi_req);
317
318 parent_req->status.MPI_ERROR = MPI_ERR_INTERN;
319
320 return parent_req->error_callback(error, parent_req);
321 }
322
323 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)324 ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
325 ompi_mtl_ofi_request_t *ofi_req)
326 {
327 assert(ofi_req->completion_count > 0);
328 ofi_req->completion_count--;
329
330 if (0 == ofi_req->completion_count) {
331 /* Request completed */
332 if (OPAL_UNLIKELY(NULL != ofi_req->buffer)) {
333 free(ofi_req->buffer);
334 ofi_req->buffer = NULL;
335 }
336
337 ofi_req->super.ompi_req->req_status.MPI_ERROR =
338 ofi_req->status.MPI_ERROR;
339
340 ofi_req->super.completion_callback(&ofi_req->super);
341 }
342
343 return OMPI_SUCCESS;
344 }
345
346 /* Return OFI context ID associated with the specific communicator */
347 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id)348 ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id)
349 {
350 /* For non-thread-grouping use case, only one context is used which is
351 * associated to MPI_COMM_WORLD, so use that. */
352 if (0 == ompi_mtl_ofi.thread_grouping) {
353 comm_id = 0;
354 }
355
356 return ompi_mtl_ofi.comm_to_context[comm_id];
357 }
358
359 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t * ack_req,struct ompi_communicator_t * comm,fi_addr_t * src_addr,ompi_mtl_ofi_request_t * ofi_req,mca_mtl_ofi_endpoint_t * endpoint,uint64_t * match_bits,int tag)360 ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
361 struct ompi_communicator_t *comm,
362 fi_addr_t *src_addr,
363 ompi_mtl_ofi_request_t *ofi_req,
364 mca_mtl_ofi_endpoint_t *endpoint,
365 uint64_t *match_bits,
366 int tag)
367 {
368 ssize_t ret = OMPI_SUCCESS;
369 int ctxt_id = 0;
370
371 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
372 set_thread_context(ctxt_id);
373
374 ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
375 assert(ack_req);
376
377 ack_req->parent = ofi_req;
378 ack_req->event_callback = ompi_mtl_ofi_send_ack_callback;
379 ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;
380
381 ofi_req->completion_count += 1;
382
383 MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
384 NULL,
385 0,
386 NULL,
387 *src_addr,
388 *match_bits | ompi_mtl_ofi.sync_send_ack,
389 0, /* Exact match, no ignore bits */
390 (void *) &ack_req->ctx), ret);
391 if (OPAL_UNLIKELY(0 > ret)) {
392 opal_output_verbose(1, opal_common_ofi.output,
393 "%s:%d: fi_trecv failed: %s(%zd)",
394 __FILE__, __LINE__, fi_strerror(-ret), ret);
395 free(ack_req);
396 return ompi_mtl_ofi_get_error(ret);
397 }
398
399 /* The SYNC_SEND tag bit is set for the send operation only.*/
400 MTL_OFI_SET_SYNC_SEND(*match_bits);
401 return OMPI_SUCCESS;
402 }
403
404 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode,bool ofi_cq_data)405 ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
406 struct ompi_communicator_t *comm,
407 int dest,
408 int tag,
409 struct opal_convertor_t *convertor,
410 mca_pml_base_send_mode_t mode,
411 bool ofi_cq_data)
412 {
413 ssize_t ret = OMPI_SUCCESS;
414 ompi_mtl_ofi_request_t ofi_req;
415 int ompi_ret, ctxt_id = 0;
416 void *start;
417 bool free_after;
418 size_t length;
419 uint64_t match_bits;
420 ompi_proc_t *ompi_proc = NULL;
421 mca_mtl_ofi_endpoint_t *endpoint = NULL;
422 ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
423 fi_addr_t src_addr = 0;
424 fi_addr_t sep_peer_fiaddr = 0;
425
426 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
427 set_thread_context(ctxt_id);
428
429 /**
430 * Create a send request, start it and wait until it completes.
431 */
432 ofi_req.event_callback = ompi_mtl_ofi_send_callback;
433 ofi_req.error_callback = ompi_mtl_ofi_send_error_callback;
434
435 ompi_proc = ompi_comm_peer_lookup(comm, dest);
436 endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
437
438 /* For Scalable Endpoints, gather target receive context */
439 sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
440
441 ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
442 if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
443 return ompi_ret;
444 }
445
446 ofi_req.buffer = (free_after) ? start : NULL;
447 ofi_req.length = length;
448 ofi_req.status.MPI_ERROR = OMPI_SUCCESS;
449 ofi_req.completion_count = 0;
450
451 if (OPAL_UNLIKELY(length > endpoint->mtl_ofi_module->max_msg_size)) {
452 opal_show_help("help-mtl-ofi.txt",
453 "message too big", false,
454 length, endpoint->mtl_ofi_module->max_msg_size);
455 return OMPI_ERROR;
456 }
457
458 if (ofi_cq_data) {
459 match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
460 src_addr = sep_peer_fiaddr;
461 } else {
462 match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
463 comm->c_my_rank, tag);
464 /* src_addr is ignored when FI_DIRECTED_RECV is not supported */
465 }
466
467 if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
468 ofi_req.status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &src_addr,
469 &ofi_req, endpoint,
470 &match_bits, tag);
471 if (OPAL_UNLIKELY(ofi_req.status.MPI_ERROR != OMPI_SUCCESS))
472 goto free_request_buffer;
473 }
474
475 if (ompi_mtl_ofi.max_inject_size >= length) {
476 if (ofi_cq_data) {
477 MTL_OFI_RETRY_UNTIL_DONE(fi_tinjectdata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
478 start,
479 length,
480 comm->c_my_rank,
481 sep_peer_fiaddr,
482 match_bits), ret);
483 } else {
484 MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
485 start,
486 length,
487 sep_peer_fiaddr,
488 match_bits), ret);
489 }
490 if (OPAL_UNLIKELY(0 > ret)) {
491 MTL_OFI_LOG_FI_ERR(ret,
492 ofi_cq_data ? "fi_tinjectdata failed"
493 : "fi_tinject failed");
494 if (ack_req) {
495 fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, &ack_req->ctx);
496 free(ack_req);
497 }
498
499 ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
500 goto free_request_buffer;
501 }
502 } else {
503 ofi_req.completion_count += 1;
504 if (ofi_cq_data) {
505 MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
506 start,
507 length,
508 NULL,
509 comm->c_my_rank,
510 sep_peer_fiaddr,
511 match_bits,
512 (void *) &ofi_req.ctx), ret);
513 } else {
514 MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
515 start,
516 length,
517 NULL,
518 sep_peer_fiaddr,
519 match_bits,
520 (void *) &ofi_req.ctx), ret);
521 }
522 if (OPAL_UNLIKELY(0 > ret)) {
523 MTL_OFI_LOG_FI_ERR(ret,
524 ofi_cq_data ? "fi_tsenddata failed"
525 : "fi_tsend failed");
526 ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
527 goto free_request_buffer;
528 }
529 }
530
531 /**
532 * Wait until the request is completed.
533 * ompi_mtl_ofi_send_callback() updates this variable.
534 */
535 while (0 < ofi_req.completion_count) {
536 ompi_mtl_ofi_progress();
537 }
538
539 free_request_buffer:
540 if (OPAL_UNLIKELY(NULL != ofi_req.buffer)) {
541 free(ofi_req.buffer);
542 }
543
544 return ofi_req.status.MPI_ERROR;
545 }
546
547 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode,bool blocking,mca_mtl_request_t * mtl_request,bool ofi_cq_data)548 ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl,
549 struct ompi_communicator_t *comm,
550 int dest,
551 int tag,
552 struct opal_convertor_t *convertor,
553 mca_pml_base_send_mode_t mode,
554 bool blocking,
555 mca_mtl_request_t *mtl_request,
556 bool ofi_cq_data)
557 {
558 ssize_t ret = OMPI_SUCCESS;
559 ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request;
560 int ompi_ret, ctxt_id = 0;
561 void *start;
562 size_t length;
563 bool free_after;
564 uint64_t match_bits;
565 ompi_proc_t *ompi_proc = NULL;
566 mca_mtl_ofi_endpoint_t *endpoint = NULL;
567 ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
568 fi_addr_t sep_peer_fiaddr = 0;
569
570 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
571 set_thread_context(ctxt_id);
572
573 ofi_req->event_callback = ompi_mtl_ofi_isend_callback;
574 ofi_req->error_callback = ompi_mtl_ofi_send_error_callback;
575
576 ompi_proc = ompi_comm_peer_lookup(comm, dest);
577 endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
578
579 /* For Scalable Endpoints, gather target receive context */
580 sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
581
582 ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
583 if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) return ompi_ret;
584
585 ofi_req->buffer = (free_after) ? start : NULL;
586 ofi_req->length = length;
587 ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
588 ofi_req->completion_count = 1;
589
590 if (OPAL_UNLIKELY(length > endpoint->mtl_ofi_module->max_msg_size)) {
591 opal_show_help("help-mtl-ofi.txt",
592 "message too big", false,
593 length, endpoint->mtl_ofi_module->max_msg_size);
594 return OMPI_ERROR;
595 }
596
597 if (ofi_cq_data) {
598 match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
599 } else {
600 match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
601 comm->c_my_rank, tag);
602 /* src_addr is ignored when FI_DIRECTED_RECV is not supported */
603 }
604
605 if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
606 ofi_req->status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &sep_peer_fiaddr,
607 ofi_req, endpoint,
608 &match_bits, tag);
609 if (OPAL_UNLIKELY(ofi_req->status.MPI_ERROR != OMPI_SUCCESS))
610 goto free_request_buffer;
611 }
612
613 if (ofi_cq_data) {
614 MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
615 start,
616 length,
617 NULL,
618 comm->c_my_rank,
619 sep_peer_fiaddr,
620 match_bits,
621 (void *) &ofi_req->ctx), ret);
622 } else {
623 MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
624 start,
625 length,
626 NULL,
627 sep_peer_fiaddr,
628 match_bits,
629 (void *) &ofi_req->ctx), ret);
630 }
631 if (OPAL_UNLIKELY(0 > ret)) {
632 MTL_OFI_LOG_FI_ERR(ret,
633 ofi_cq_data ? "fi_tsenddata failed"
634 : "fi_tsend failed");
635 ofi_req->status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
636 }
637
638 free_request_buffer:
639 if (OPAL_UNLIKELY(OMPI_SUCCESS != ofi_req->status.MPI_ERROR
640 && NULL != ofi_req->buffer)) {
641 free(ofi_req->buffer);
642 }
643
644 return ofi_req->status.MPI_ERROR;
645 }
646
647 /**
648 * Called when a completion for a posted recv is received.
649 */
650 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)651 ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
652 ompi_mtl_ofi_request_t *ofi_req)
653 {
654 int ompi_ret, ctxt_id = 0;
655 ssize_t ret;
656 ompi_proc_t *ompi_proc = NULL;
657 mca_mtl_ofi_endpoint_t *endpoint = NULL;
658 int src = mtl_ofi_get_source(wc);
659 ompi_status_public_t *status = NULL;
660 struct fi_msg_tagged tagged_msg;
661
662 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(ofi_req->comm->c_contextid);
663
664 assert(ofi_req->super.ompi_req);
665 status = &ofi_req->super.ompi_req->req_status;
666
667 /**
668 * Any event associated with a request starts it.
669 * This prevents a started request from being cancelled.
670 */
671 ofi_req->req_started = true;
672
673 status->MPI_SOURCE = src;
674 status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
675 status->_ucount = wc->len;
676
677 if (OPAL_UNLIKELY(wc->len > ofi_req->length)) {
678 opal_output_verbose(1, opal_common_ofi.output,
679 "truncate expected: %ld %ld",
680 wc->len, ofi_req->length);
681 status->MPI_ERROR = MPI_ERR_TRUNCATE;
682 }
683
684 /**
685 * Unpack data into recv buffer if necessary.
686 */
687 if (OPAL_UNLIKELY(ofi_req->buffer)) {
688 ompi_ret = ompi_mtl_datatype_unpack(ofi_req->convertor,
689 ofi_req->buffer,
690 wc->len);
691 if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
692 opal_output_verbose(1, opal_common_ofi.output,
693 "%s:%d: ompi_mtl_datatype_unpack failed: %d",
694 __FILE__, __LINE__, ompi_ret);
695 status->MPI_ERROR = ompi_ret;
696 }
697 }
698
699 /**
700 * We can only accept MTL_OFI_SYNC_SEND in the standard recv callback.
701 * MTL_OFI_SYNC_SEND_ACK should only be received in the send_ack
702 * callback.
703 */
704 assert(!MTL_OFI_IS_SYNC_SEND_ACK(wc->tag));
705
706 /**
707 * If this recv is part of an MPI_Ssend operation, then we send an
708 * acknowledgment back to the sender.
709 * The ack message is sent without generating a completion event in
710 * the completion queue by not setting FI_COMPLETION in the flags to
711 * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
712 * This is done since the 0 byte message requires no
713 * notification on the send side for a successful completion.
714 * If a failure occurs the provider will notify the error
715 * in the cq_readerr during OFI progress. Once the message has been
716 * successfully processed the request is marked as completed.
717 */
718 if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
719 /**
720 * If the recv request was posted for any source,
721 * we need to extract the source's actual address.
722 */
723 if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
724 ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
725 endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
726 ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
727 }
728
729 tagged_msg.msg_iov = NULL;
730 tagged_msg.desc = NULL;
731 tagged_msg.iov_count = 0;
732 tagged_msg.addr = ofi_req->remote_addr;
733 /**
734 * We must continue to use the user's original tag but remove the
735 * sync_send protocol tag bit and instead apply the sync_send_ack
736 * tag bit to complete the initator's sync send receive.
737 */
738 tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
739 tagged_msg.context = NULL;
740 tagged_msg.data = 0;
741
742 MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
743 &tagged_msg, 0), ret);
744 if (OPAL_UNLIKELY(0 > ret)) {
745 MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed");
746 status->MPI_ERROR = OMPI_ERROR;
747 }
748 }
749
750 ofi_req->super.completion_callback(&ofi_req->super);
751
752 return OMPI_SUCCESS;
753 }
754
755 /**
756 * Called when an error occured on a recv request.
757 */
758 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)759 ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry *error,
760 ompi_mtl_ofi_request_t *ofi_req)
761 {
762 ompi_status_public_t *status;
763 assert(ofi_req->super.ompi_req);
764 status = &ofi_req->super.ompi_req->req_status;
765 status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
766 status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error);
767
768 switch (error->err) {
769 case FI_ETRUNC:
770 status->MPI_ERROR = MPI_ERR_TRUNCATE;
771 break;
772 case FI_ECANCELED:
773 status->_cancelled = true;
774 break;
775 default:
776 status->MPI_ERROR = MPI_ERR_INTERN;
777 }
778
779 ofi_req->super.completion_callback(&ofi_req->super);
780 return OMPI_SUCCESS;
781 }
782
783 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,struct opal_convertor_t * convertor,mca_mtl_request_t * mtl_request,bool ofi_cq_data)784 ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t *mtl,
785 struct ompi_communicator_t *comm,
786 int src,
787 int tag,
788 struct opal_convertor_t *convertor,
789 mca_mtl_request_t *mtl_request,
790 bool ofi_cq_data)
791 {
792 int ompi_ret = OMPI_SUCCESS, ctxt_id = 0;
793 ssize_t ret;
794 uint64_t match_bits, mask_bits;
795 fi_addr_t remote_addr = ompi_mtl_ofi.any_addr;
796 ompi_proc_t *ompi_proc = NULL;
797 mca_mtl_ofi_endpoint_t *endpoint = NULL;
798 ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
799 void *start;
800 size_t length;
801 bool free_after;
802
803 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
804 set_thread_context(ctxt_id);
805
806 if (ofi_cq_data) {
807 if (MPI_ANY_SOURCE != src) {
808 ompi_proc = ompi_comm_peer_lookup(comm, src);
809 endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
810 remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
811 }
812
813 mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
814 tag);
815 } else {
816 mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
817 tag);
818 /* src_addr is ignored when FI_DIRECTED_RECV is not used */
819 }
820
821 ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
822 &start,
823 &length,
824 &free_after);
825 if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
826 return ompi_ret;
827 }
828
829 ofi_req->type = OMPI_MTL_OFI_RECV;
830 ofi_req->event_callback = ompi_mtl_ofi_recv_callback;
831 ofi_req->error_callback = ompi_mtl_ofi_recv_error_callback;
832 ofi_req->comm = comm;
833 ofi_req->buffer = (free_after) ? start : NULL;
834 ofi_req->length = length;
835 ofi_req->convertor = convertor;
836 ofi_req->req_started = false;
837 ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
838 ofi_req->remote_addr = remote_addr;
839 ofi_req->match_bits = match_bits;
840
841 MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
842 start,
843 length,
844 NULL,
845 remote_addr,
846 match_bits,
847 mask_bits,
848 (void *)&ofi_req->ctx), ret);
849 if (OPAL_UNLIKELY(0 > ret)) {
850 if (NULL != ofi_req->buffer) {
851 free(ofi_req->buffer);
852 }
853 MTL_OFI_LOG_FI_ERR(ret, "fi_trecv failed");
854 return ompi_mtl_ofi_get_error(ret);
855 }
856
857 return OMPI_SUCCESS;
858 }
859
860 /**
861 * Called when a mrecv request completes.
862 */
863 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)864 ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
865 ompi_mtl_ofi_request_t *ofi_req)
866 {
867 struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
868 ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
869 status->MPI_SOURCE = mtl_ofi_get_source(wc);
870 status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
871 status->MPI_ERROR = MPI_SUCCESS;
872 status->_ucount = wc->len;
873
874 free(ofi_req);
875
876 mrecv_req->completion_callback(mrecv_req);
877
878 return OMPI_SUCCESS;
879 }
880
881 /**
882 * Called when an error occured on a mrecv request.
883 */
884 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)885 ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry *error,
886 ompi_mtl_ofi_request_t *ofi_req)
887 {
888 struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
889 ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
890 status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
891 status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error);
892
893 switch (error->err) {
894 case FI_ETRUNC:
895 status->MPI_ERROR = MPI_ERR_TRUNCATE;
896 break;
897 case FI_ECANCELED:
898 status->_cancelled = true;
899 break;
900 default:
901 status->MPI_ERROR = MPI_ERR_INTERN;
902 }
903
904 free(ofi_req);
905
906 mrecv_req->completion_callback(mrecv_req);
907
908 return OMPI_SUCCESS;
909 }
910
911 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t * mtl,struct opal_convertor_t * convertor,struct ompi_message_t ** message,struct mca_mtl_request_t * mtl_request)912 ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
913 struct opal_convertor_t *convertor,
914 struct ompi_message_t **message,
915 struct mca_mtl_request_t *mtl_request)
916 {
917 ompi_mtl_ofi_request_t *ofi_req =
918 (ompi_mtl_ofi_request_t *)(*message)->req_ptr;
919 void *start;
920 size_t length;
921 bool free_after;
922 struct iovec iov;
923 struct fi_msg_tagged msg;
924 int ompi_ret, ctxt_id = 0;
925 ssize_t ret;
926 uint64_t msgflags = FI_CLAIM | FI_COMPLETION;
927 struct ompi_communicator_t *comm = (*message)->comm;
928
929 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
930 set_thread_context(ctxt_id);
931
932 ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
933 &start,
934 &length,
935 &free_after);
936 if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
937 return ompi_ret;
938 }
939
940 ofi_req->type = OMPI_MTL_OFI_RECV;
941 ofi_req->event_callback = ompi_mtl_ofi_mrecv_callback;
942 ofi_req->error_callback = ompi_mtl_ofi_mrecv_error_callback;
943 ofi_req->buffer = (free_after) ? start : NULL;
944 ofi_req->length = length;
945 ofi_req->convertor = convertor;
946 ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
947 ofi_req->mrecv_req = mtl_request;
948
949 /**
950 * fi_trecvmsg with FI_CLAIM
951 */
952 iov.iov_base = start;
953 iov.iov_len = length;
954 msg.msg_iov = &iov;
955 msg.desc = NULL;
956 msg.iov_count = 1;
957 msg.addr = 0;
958 msg.tag = ofi_req->match_bits;
959 msg.ignore = ofi_req->mask_bits;
960 msg.context = (void *)&ofi_req->ctx;
961 msg.data = 0;
962
963 MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
964 if (OPAL_UNLIKELY(0 > ret)) {
965 MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
966 return ompi_mtl_ofi_get_error(ret);
967 }
968
969 *message = MPI_MESSAGE_NULL;
970
971 return OMPI_SUCCESS;
972 }
973
974 /**
975 * Called when a probe request completes.
976 */
977 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry * wc,ompi_mtl_ofi_request_t * ofi_req)978 ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry *wc,
979 ompi_mtl_ofi_request_t *ofi_req)
980 {
981 ofi_req->match_state = 1;
982 ofi_req->match_bits = wc->tag;
983 ofi_req->status.MPI_SOURCE = mtl_ofi_get_source(wc);
984 ofi_req->status.MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
985 ofi_req->status.MPI_ERROR = MPI_SUCCESS;
986 ofi_req->status._ucount = wc->len;
987 ofi_req->completion_count--;
988
989 return OMPI_SUCCESS;
990 }
991
992 /**
993 * Called when a probe request encounters an error.
994 */
995 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_probe_error_callback(struct fi_cq_err_entry * error,ompi_mtl_ofi_request_t * ofi_req)996 ompi_mtl_ofi_probe_error_callback(struct fi_cq_err_entry *error,
997 ompi_mtl_ofi_request_t *ofi_req)
998 {
999 ofi_req->completion_count--;
1000
1001 /*
1002 * Receives posted with FI_PEEK and friends will get an error
1003 * completion with FI_ENOMSG. This just indicates the lack of a match for
1004 * the probe and is not an error case. All other error cases are
1005 * provider-internal errors and should be flagged as such.
1006 */
1007 if (error->err == FI_ENOMSG)
1008 return OMPI_SUCCESS;
1009
1010 ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
1011
1012 return OMPI_ERROR;
1013 }
1014
1015 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * flag,struct ompi_status_public_t * status,bool ofi_cq_data)1016 ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t *mtl,
1017 struct ompi_communicator_t *comm,
1018 int src,
1019 int tag,
1020 int *flag,
1021 struct ompi_status_public_t *status,
1022 bool ofi_cq_data)
1023 {
1024 struct ompi_mtl_ofi_request_t ofi_req;
1025 ompi_proc_t *ompi_proc = NULL;
1026 mca_mtl_ofi_endpoint_t *endpoint = NULL;
1027 fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
1028 uint64_t match_bits, mask_bits;
1029 ssize_t ret;
1030 struct fi_msg_tagged msg;
1031 uint64_t msgflags = FI_PEEK | FI_COMPLETION;
1032 int ctxt_id = 0;
1033
1034 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
1035 set_thread_context(ctxt_id);
1036
1037 if (ofi_cq_data) {
1038 /* If the source is known, use its peer_fiaddr. */
1039 if (MPI_ANY_SOURCE != src) {
1040 ompi_proc = ompi_comm_peer_lookup( comm, src );
1041 endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
1042 remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1043 }
1044
1045 mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
1046 tag);
1047 }
1048 else {
1049 mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
1050 tag);
1051 /* src_addr is ignored when FI_DIRECTED_RECV is not used */
1052 }
1053
1054 /**
1055 * fi_trecvmsg with FI_PEEK:
1056 * Initiate a search for a match in the hardware or software queue.
1057 * If successful, libfabric will enqueue a context entry into the completion
1058 * queue to make the search nonblocking. This code will poll until the
1059 * entry is enqueued.
1060 */
1061 msg.msg_iov = NULL;
1062 msg.desc = NULL;
1063 msg.iov_count = 0;
1064 msg.addr = remote_proc;
1065 msg.tag = match_bits;
1066 msg.ignore = mask_bits;
1067 msg.context = (void *)&ofi_req.ctx;
1068 msg.data = 0;
1069
1070 ofi_req.type = OMPI_MTL_OFI_PROBE;
1071 ofi_req.event_callback = ompi_mtl_ofi_probe_callback;
1072 ofi_req.error_callback = ompi_mtl_ofi_probe_error_callback;
1073 ofi_req.completion_count = 1;
1074 ofi_req.match_state = 0;
1075
1076 MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
1077 if (OPAL_UNLIKELY(0 > ret)) {
1078 MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
1079 return ompi_mtl_ofi_get_error(ret);
1080 }
1081
1082 while (0 < ofi_req.completion_count) {
1083 opal_progress();
1084 }
1085
1086 *flag = ofi_req.match_state;
1087 if (1 == *flag) {
1088 if (MPI_STATUS_IGNORE != status) {
1089 *status = ofi_req.status;
1090 }
1091 }
1092
1093 return OMPI_SUCCESS;
1094 }
1095
1096 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * matched,struct ompi_message_t ** message,struct ompi_status_public_t * status,bool ofi_cq_data)1097 ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t *mtl,
1098 struct ompi_communicator_t *comm,
1099 int src,
1100 int tag,
1101 int *matched,
1102 struct ompi_message_t **message,
1103 struct ompi_status_public_t *status,
1104 bool ofi_cq_data)
1105 {
1106 struct ompi_mtl_ofi_request_t *ofi_req;
1107 ompi_proc_t *ompi_proc = NULL;
1108 mca_mtl_ofi_endpoint_t *endpoint = NULL;
1109 fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
1110 uint64_t match_bits, mask_bits;
1111 ssize_t ret;
1112 struct fi_msg_tagged msg;
1113 uint64_t msgflags = FI_PEEK | FI_CLAIM | FI_COMPLETION;
1114 int ctxt_id = 0;
1115
1116 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
1117 set_thread_context(ctxt_id);
1118
1119 ofi_req = malloc(sizeof *ofi_req);
1120 if (NULL == ofi_req) {
1121 return OMPI_ERROR;
1122 }
1123
1124 /**
1125 * If the source is known, use its peer_fiaddr.
1126 */
1127
1128 if (ofi_cq_data) {
1129 if (MPI_ANY_SOURCE != src) {
1130 ompi_proc = ompi_comm_peer_lookup( comm, src );
1131 endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
1132 remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1133 }
1134
1135 mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
1136 tag);
1137 }
1138 else {
1139 /* src_addr is ignored when FI_DIRECTED_RECV is not used */
1140 mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
1141 tag);
1142 }
1143
1144 /**
1145 * fi_trecvmsg with FI_PEEK and FI_CLAIM:
1146 * Initiate a search for a match in the hardware or software queue.
1147 * If successful, libfabric will enqueue a context entry into the completion
1148 * queue to make the search nonblocking. This code will poll until the
1149 * entry is enqueued.
1150 */
1151 msg.msg_iov = NULL;
1152 msg.desc = NULL;
1153 msg.iov_count = 0;
1154 msg.addr = remote_proc;
1155 msg.tag = match_bits;
1156 msg.ignore = mask_bits;
1157 msg.context = (void *)&ofi_req->ctx;
1158 msg.data = 0;
1159
1160 ofi_req->type = OMPI_MTL_OFI_PROBE;
1161 ofi_req->event_callback = ompi_mtl_ofi_probe_callback;
1162 ofi_req->error_callback = ompi_mtl_ofi_probe_error_callback;
1163 ofi_req->completion_count = 1;
1164 ofi_req->match_state = 0;
1165 ofi_req->mask_bits = mask_bits;
1166
1167 MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
1168 if (OPAL_UNLIKELY(0 > ret)) {
1169 MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
1170 free(ofi_req);
1171 return ompi_mtl_ofi_get_error(ret);
1172 }
1173
1174 while (0 < ofi_req->completion_count) {
1175 opal_progress();
1176 }
1177
1178 *matched = ofi_req->match_state;
1179 if (1 == *matched) {
1180 if (MPI_STATUS_IGNORE != status) {
1181 *status = ofi_req->status;
1182 }
1183
1184 (*message) = ompi_message_alloc();
1185 if (NULL == (*message)) {
1186 return OMPI_ERR_OUT_OF_RESOURCE;
1187 }
1188
1189 (*message)->comm = comm;
1190 (*message)->req_ptr = ofi_req;
1191 (*message)->peer = ofi_req->status.MPI_SOURCE;
1192 (*message)->count = ofi_req->status._ucount;
1193
1194 } else {
1195 (*message) = MPI_MESSAGE_NULL;
1196 free(ofi_req);
1197 }
1198
1199 return OMPI_SUCCESS;
1200 }
1201
1202 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t * mtl,mca_mtl_request_t * mtl_request,int flag)1203 ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
1204 mca_mtl_request_t *mtl_request,
1205 int flag)
1206 {
1207 int ret, ctxt_id = 0;
1208 ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
1209
1210 ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(ofi_req->comm->c_contextid);
1211
1212 switch (ofi_req->type) {
1213 case OMPI_MTL_OFI_SEND:
1214 /**
1215 * Cannot cancel sends yet
1216 */
1217 break;
1218
1219 case OMPI_MTL_OFI_RECV:
1220 /**
1221 * Cancel a receive request only if it hasn't been matched yet.
1222 * The event queue needs to be drained to make sure there isn't
1223 * any pending receive completion event.
1224 */
1225 ompi_mtl_ofi_progress();
1226
1227 if (!ofi_req->req_started) {
1228 ret = fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
1229 &ofi_req->ctx);
1230 if (0 == ret) {
1231 if (ofi_req->req_started)
1232 goto ofi_cancel_not_possible;
1233 } else {
1234 ofi_cancel_not_possible:
1235 /**
1236 * Could not cancel the request.
1237 */
1238 ofi_req->super.ompi_req->req_status._cancelled = false;
1239 }
1240 }
1241 break;
1242
1243 default:
1244 return OMPI_ERROR;
1245 }
1246
1247 return OMPI_SUCCESS;
1248 }
1249
ompi_mtl_ofi_init_contexts(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,mca_mtl_ofi_ep_type ep_type)1250 static int ompi_mtl_ofi_init_contexts(struct mca_mtl_base_module_t *mtl,
1251 struct ompi_communicator_t *comm,
1252 mca_mtl_ofi_ep_type ep_type)
1253 {
1254 int ret;
1255 int ctxt_id = ompi_mtl_ofi.total_ctxts_used;
1256 struct fi_cq_attr cq_attr = {0};
1257 cq_attr.format = FI_CQ_FORMAT_TAGGED;
1258 cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
1259
1260 if (OFI_REGULAR_EP == ep_type) {
1261 /*
1262 * For regular endpoints, just create the Lock object and register
1263 * progress function.
1264 */
1265 goto init_regular_ep;
1266 }
1267
1268 /*
1269 * We only create upto Max number of contexts asked for by the user.
1270 * If user enables thread grouping feature and creates more number of
1271 * communicators than available contexts, then we set the threshold
1272 * context_id so that new communicators created beyond the threshold
1273 * will be assigned to contexts in a round-robin fashion.
1274 */
1275 if (ompi_mtl_ofi.num_ofi_contexts <= ompi_mtl_ofi.total_ctxts_used) {
1276 ompi_mtl_ofi.comm_to_context[comm->c_contextid] = comm->c_contextid %
1277 ompi_mtl_ofi.total_ctxts_used;
1278 if (!ompi_mtl_ofi.threshold_comm_context_id) {
1279 ompi_mtl_ofi.threshold_comm_context_id = comm->c_contextid;
1280
1281 opal_show_help("help-mtl-ofi.txt", "SEP thread grouping ctxt limit", true, ctxt_id,
1282 ompi_process_info.nodename, __FILE__, __LINE__);
1283 }
1284
1285 return OMPI_SUCCESS;
1286 }
1287
1288 /* Init context info for Scalable EPs */
1289 ret = fi_tx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, NULL);
1290 if (ret) {
1291 MTL_OFI_LOG_FI_ERR(ret, "fi_tx_context failed");
1292 goto init_error;
1293 }
1294
1295 ret = fi_rx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, NULL);
1296 if (ret) {
1297 MTL_OFI_LOG_FI_ERR(ret, "fi_rx_context failed");
1298 goto init_error;
1299 }
1300
1301 ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, NULL);
1302 if (ret) {
1303 MTL_OFI_LOG_FI_ERR(ret, "fi_cq_open failed");
1304 goto init_error;
1305 }
1306
1307 /* Bind CQ to TX/RX context object */
1308 ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
1309 FI_TRANSMIT | FI_SELECTIVE_COMPLETION);
1310 if (0 != ret) {
1311 MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_TRANSMIT) failed");
1312 goto init_error;
1313 }
1314
1315 ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
1316 FI_RECV | FI_SELECTIVE_COMPLETION);
1317 if (0 != ret) {
1318 MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_RECV) failed");
1319 goto init_error;
1320 }
1321
1322 /* Enable Endpoint for communication. This commits the bind operations */
1323 ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
1324 if (0 != ret) {
1325 MTL_OFI_LOG_FI_ERR(ret, "fi_enable (send context) failed");
1326 goto init_error;
1327 }
1328
1329 ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
1330 if (0 != ret) {
1331 MTL_OFI_LOG_FI_ERR(ret, "fi_enable (recv context) failed");
1332 goto init_error;
1333 }
1334
1335 init_regular_ep:
1336 /* Initialize per-context lock */
1337 OBJ_CONSTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock, opal_mutex_t);
1338
1339 if (MPI_COMM_WORLD == comm) {
1340 ret = opal_progress_register(ompi_mtl_ofi_progress_no_inline);
1341 if (OMPI_SUCCESS != ret) {
1342 opal_output_verbose(1, opal_common_ofi.output,
1343 "%s:%d: opal_progress_register failed: %d\n",
1344 __FILE__, __LINE__, ret);
1345 goto init_error;
1346 }
1347 }
1348
1349 ompi_mtl_ofi.comm_to_context[comm->c_contextid] = ompi_mtl_ofi.total_ctxts_used;
1350 ompi_mtl_ofi.total_ctxts_used++;
1351
1352 return OMPI_SUCCESS;
1353
1354 init_error:
1355 if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep) {
1356 (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
1357 }
1358
1359 if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep) {
1360 (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
1361 }
1362
1363 if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq) {
1364 (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq);
1365 }
1366
1367 return ret;
1368 }
1369
ompi_mtl_ofi_finalize_contexts(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,mca_mtl_ofi_ep_type ep_type)1370 static int ompi_mtl_ofi_finalize_contexts(struct mca_mtl_base_module_t *mtl,
1371 struct ompi_communicator_t *comm,
1372 mca_mtl_ofi_ep_type ep_type)
1373 {
1374 int ret = OMPI_SUCCESS, ctxt_id = 0;
1375
1376 if (OFI_REGULAR_EP == ep_type) {
1377 /* For regular EPs, simply destruct Lock object and exit */
1378 goto finalize_regular_ep;
1379 }
1380
1381 if (ompi_mtl_ofi.thread_grouping &&
1382 ompi_mtl_ofi.threshold_comm_context_id &&
1383 ((uint32_t) ompi_mtl_ofi.threshold_comm_context_id <= comm->c_contextid)) {
1384 return OMPI_SUCCESS;
1385 }
1386
1387 ctxt_id = ompi_mtl_ofi.thread_grouping ?
1388 ompi_mtl_ofi.comm_to_context[comm->c_contextid] : 0;
1389
1390 /*
1391 * For regular EPs, TX/RX contexts are aliased to SEP object which is
1392 * closed in ompi_mtl_ofi_finalize(). So, skip handling those here.
1393 */
1394 if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep))) {
1395 goto finalize_err;
1396 }
1397
1398 if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep))) {
1399 goto finalize_err;
1400 }
1401
1402 if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq))) {
1403 goto finalize_err;
1404 }
1405
1406 finalize_regular_ep:
1407 /* Destroy context lock */
1408 OBJ_DESTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock);
1409
1410 return OMPI_SUCCESS;
1411
1412 finalize_err:
1413 opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
1414 "fi_close",
1415 ompi_process_info.nodename, __FILE__, __LINE__,
1416 fi_strerror(-ret), ret);
1417
1418 return OMPI_ERROR;
1419 }
1420
1421 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm)1422 ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl,
1423 struct ompi_communicator_t *comm)
1424 {
1425 int ret;
1426 mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ?
1427 OFI_REGULAR_EP : OFI_SCALABLE_EP;
1428
1429 /*
1430 * If thread grouping enabled, add new OFI context for each communicator
1431 * other than MPI_COMM_SELF.
1432 */
1433 if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
1434 /* If no thread grouping, add new OFI context only
1435 * for MPI_COMM_WORLD.
1436 */
1437 (!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
1438
1439 ret = ompi_mtl_ofi_init_contexts(mtl, comm, ep_type);
1440
1441 if (OMPI_SUCCESS != ret) {
1442 goto error;
1443 }
1444 }
1445
1446 return OMPI_SUCCESS;
1447
1448 error:
1449 return OMPI_ERROR;
1450 }
1451
1452 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm)1453 ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl,
1454 struct ompi_communicator_t *comm)
1455 {
1456 int ret = OMPI_SUCCESS;
1457 mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ?
1458 OFI_REGULAR_EP : OFI_SCALABLE_EP;
1459
1460 /*
1461 * Clean up OFI contexts information.
1462 */
1463 if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
1464 (!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
1465
1466 ret = ompi_mtl_ofi_finalize_contexts(mtl, comm, ep_type);
1467 }
1468
1469 return ret;
1470 }
1471
1472 #ifdef MCA_ompi_mtl_DIRECT_CALL
1473
1474 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_send(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode)1475 ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
1476 struct ompi_communicator_t *comm,
1477 int dest,
1478 int tag,
1479 struct opal_convertor_t *convertor,
1480 mca_pml_base_send_mode_t mode)
1481 {
1482 return ompi_mtl_ofi_send_generic(mtl, comm, dest, tag,
1483 convertor, mode,
1484 ompi_mtl_ofi.fi_cq_data);
1485 }
1486
1487 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_isend(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int dest,int tag,struct opal_convertor_t * convertor,mca_pml_base_send_mode_t mode,bool blocking,mca_mtl_request_t * mtl_request)1488 ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
1489 struct ompi_communicator_t *comm,
1490 int dest,
1491 int tag,
1492 struct opal_convertor_t *convertor,
1493 mca_pml_base_send_mode_t mode,
1494 bool blocking,
1495 mca_mtl_request_t *mtl_request)
1496 {
1497 return ompi_mtl_ofi_isend_generic(mtl, comm, dest, tag,
1498 convertor, mode, blocking, mtl_request,
1499 ompi_mtl_ofi.fi_cq_data);
1500 }
1501
1502 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,struct opal_convertor_t * convertor,mca_mtl_request_t * mtl_request)1503 ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
1504 struct ompi_communicator_t *comm,
1505 int src,
1506 int tag,
1507 struct opal_convertor_t *convertor,
1508 mca_mtl_request_t *mtl_request)
1509 {
1510 return ompi_mtl_ofi_irecv_generic(mtl, comm, src, tag,
1511 convertor, mtl_request,
1512 ompi_mtl_ofi.fi_cq_data);
1513 }
1514
1515 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * flag,struct ompi_status_public_t * status)1516 ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
1517 struct ompi_communicator_t *comm,
1518 int src,
1519 int tag,
1520 int *flag,
1521 struct ompi_status_public_t *status)
1522 {
1523 return ompi_mtl_ofi_iprobe_generic(mtl, comm, src, tag,
1524 flag, status,
1525 ompi_mtl_ofi.fi_cq_data);
1526 }
1527
1528 __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t * mtl,struct ompi_communicator_t * comm,int src,int tag,int * matched,struct ompi_message_t ** message,struct ompi_status_public_t * status)1529 ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
1530 struct ompi_communicator_t *comm,
1531 int src,
1532 int tag,
1533 int *matched,
1534 struct ompi_message_t **message,
1535 struct ompi_status_public_t *status)
1536 {
1537 return ompi_mtl_ofi_improbe_generic(mtl, comm, src, tag,
1538 matched, message, status,
1539 ompi_mtl_ofi.fi_cq_data);
1540 }
1541 #endif
1542
1543 END_C_DECLS
1544
1545 #endif /* MTL_OFI_H_HAS_BEEN_INCLUDED */
1546