1 /*
2 * Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates.
3 * All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34 #include <inttypes.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include "ofi.h"
38 #include <ofi_util.h>
39 #include <ofi_iov.h>
40
41 #include "efa.h"
42 #include "rxr.h"
43 #include "rxr_msg.h"
44 #include "rxr_pkt_cmd.h"
45
46 /**
47 * This file define the msg ops functions.
48 * It is consisted of the following sections:
49 * send functions,
50 * receive functions and
51 * ops structure
52 */
53
54 /**
55 * Send function
56 */
57
58 /**
59 * Utility functions used by both non-tagged and tagged send.
60 */
rxr_msg_post_rtm(struct rxr_ep * rxr_ep,struct rxr_tx_entry * tx_entry)61 ssize_t rxr_msg_post_rtm(struct rxr_ep *rxr_ep, struct rxr_tx_entry *tx_entry)
62 {
63 /*
64 * For performance consideration, this function assume the tagged rtm packet type id is
65 * always the correspondent message rtm packet type id + 1, thus the assertion here.
66 */
67 assert(RXR_EAGER_MSGRTM_PKT + 1 == RXR_EAGER_TAGRTM_PKT);
68 assert(RXR_READ_MSGRTM_PKT + 1 == RXR_READ_TAGRTM_PKT);
69 assert(RXR_LONG_MSGRTM_PKT + 1 == RXR_LONG_TAGRTM_PKT);
70 assert(RXR_MEDIUM_MSGRTM_PKT + 1 == RXR_MEDIUM_TAGRTM_PKT);
71
72 int tagged;
73 size_t max_rtm_data_size;
74 ssize_t err;
75 struct rxr_peer *peer;
76
77 assert(tx_entry->op == ofi_op_msg || tx_entry->op == ofi_op_tagged);
78 tagged = (tx_entry->op == ofi_op_tagged);
79 assert(tagged == 0 || tagged == 1);
80
81 max_rtm_data_size = rxr_pkt_req_max_data_size(rxr_ep,
82 tx_entry->addr,
83 RXR_EAGER_MSGRTM_PKT + tagged);
84
85 peer = rxr_ep_get_peer(rxr_ep, tx_entry->addr);
86
87 if (peer->is_local) {
88 assert(rxr_ep->use_shm);
89 /* intra instance message */
90 int rtm_type = (tx_entry->total_len <= max_rtm_data_size) ? RXR_EAGER_MSGRTM_PKT
91 : RXR_READ_MSGRTM_PKT;
92
93 return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry, rtm_type + tagged, 0);
94 }
95
96 /* inter instance message */
97 if (tx_entry->total_len <= max_rtm_data_size)
98 return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
99 RXR_EAGER_MSGRTM_PKT + tagged, 0);
100
101 if (tx_entry->total_len <= rxr_env.efa_max_medium_msg_size) {
102 /* we do not check the return value of rxr_ep_init_mr_desc()
103 * because medium message works even if MR registration failed
104 */
105 if (efa_mr_cache_enable)
106 rxr_ep_tx_init_mr_desc(rxr_ep_domain(rxr_ep),
107 tx_entry, 0, FI_SEND);
108 return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
109 RXR_MEDIUM_MSGRTM_PKT + tagged, 0);
110 }
111
112 if (efa_both_support_rdma_read(rxr_ep, peer) &&
113 (tx_entry->desc[0] || efa_mr_cache_enable)) {
114 /* use read message protocol */
115 err = rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
116 RXR_READ_MSGRTM_PKT + tagged, 0);
117
118 if (err != -FI_ENOMEM)
119 return err;
120
121 /*
122 * If memory registration failed, we continue here
123 * and fall back to use long message protocol
124 */
125 }
126
127 err = rxr_ep_set_tx_credit_request(rxr_ep, tx_entry);
128 if (OFI_UNLIKELY(err))
129 return err;
130
131 return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
132 RXR_LONG_MSGRTM_PKT + tagged, 0);
133 }
134
rxr_msg_generic_send(struct fid_ep * ep,const struct fi_msg * msg,uint64_t tag,uint32_t op,uint64_t flags)135 ssize_t rxr_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg,
136 uint64_t tag, uint32_t op, uint64_t flags)
137 {
138 struct rxr_ep *rxr_ep;
139 ssize_t err;
140 struct rxr_tx_entry *tx_entry;
141 struct rxr_peer *peer;
142
143 FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
144 "iov_len: %lu tag: %lx op: %x flags: %lx\n",
145 ofi_total_iov_len(msg->msg_iov, msg->iov_count),
146 tag, op, flags);
147
148 rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
149 assert(msg->iov_count <= rxr_ep->tx_iov_limit);
150
151 rxr_perfset_start(rxr_ep, perf_rxr_tx);
152 fastlock_acquire(&rxr_ep->util_ep.lock);
153
154 if (OFI_UNLIKELY(is_tx_res_full(rxr_ep))) {
155 err = -FI_EAGAIN;
156 goto out;
157 }
158
159 tx_entry = rxr_ep_alloc_tx_entry(rxr_ep, msg, op, tag, flags);
160
161 if (OFI_UNLIKELY(!tx_entry)) {
162 err = -FI_EAGAIN;
163 rxr_ep_progress_internal(rxr_ep);
164 goto out;
165 }
166
167 assert(tx_entry->op == ofi_op_msg || tx_entry->op == ofi_op_tagged);
168
169 peer = rxr_ep_get_peer(rxr_ep, tx_entry->addr);
170 assert(peer);
171 tx_entry->msg_id = peer->next_msg_id++;
172 err = rxr_msg_post_rtm(rxr_ep, tx_entry);
173 if (OFI_UNLIKELY(err)) {
174 rxr_release_tx_entry(rxr_ep, tx_entry);
175 peer->next_msg_id--;
176 }
177
178 out:
179 fastlock_release(&rxr_ep->util_ep.lock);
180 rxr_perfset_end(rxr_ep, perf_rxr_tx);
181 return err;
182 }
183
184 /**
185 * Non-tagged send ops function
186 */
187 static
rxr_msg_sendmsg(struct fid_ep * ep,const struct fi_msg * msg,uint64_t flags)188 ssize_t rxr_msg_sendmsg(struct fid_ep *ep, const struct fi_msg *msg,
189 uint64_t flags)
190 {
191 return rxr_msg_generic_send(ep, msg, 0, ofi_op_msg, flags);
192 }
193
194 static
rxr_msg_sendv(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t dest_addr,void * context)195 ssize_t rxr_msg_sendv(struct fid_ep *ep, const struct iovec *iov,
196 void **desc, size_t count, fi_addr_t dest_addr,
197 void *context)
198 {
199 struct rxr_ep *rxr_ep;
200 struct fi_msg msg;
201
202 memset(&msg, 0, sizeof(msg));
203 msg.msg_iov = iov;
204 msg.desc = desc;
205 msg.iov_count = count;
206 msg.addr = dest_addr;
207 msg.context = context;
208
209 rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
210 return rxr_msg_sendmsg(ep, &msg, rxr_tx_flags(rxr_ep));
211 }
212
213 static
rxr_msg_send(struct fid_ep * ep,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,void * context)214 ssize_t rxr_msg_send(struct fid_ep *ep, const void *buf, size_t len,
215 void *desc, fi_addr_t dest_addr, void *context)
216 {
217 struct iovec iov;
218
219 iov.iov_base = (void *)buf;
220 iov.iov_len = len;
221 return rxr_msg_sendv(ep, &iov, desc, 1, dest_addr, context);
222 }
223
224 static
rxr_msg_senddata(struct fid_ep * ep,const void * buf,size_t len,void * desc,uint64_t data,fi_addr_t dest_addr,void * context)225 ssize_t rxr_msg_senddata(struct fid_ep *ep, const void *buf, size_t len,
226 void *desc, uint64_t data, fi_addr_t dest_addr,
227 void *context)
228 {
229 struct fi_msg msg;
230 struct iovec iov;
231 struct rxr_ep *rxr_ep;
232
233 iov.iov_base = (void *)buf;
234 iov.iov_len = len;
235
236 memset(&msg, 0, sizeof(msg));
237 msg.msg_iov = &iov;
238 msg.desc = desc;
239 msg.iov_count = 1;
240 msg.addr = dest_addr;
241 msg.context = context;
242 msg.data = data;
243
244 rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
245 return rxr_msg_generic_send(ep, &msg, 0, ofi_op_msg,
246 rxr_tx_flags(rxr_ep) | FI_REMOTE_CQ_DATA);
247 }
248
249 static
rxr_msg_inject(struct fid_ep * ep,const void * buf,size_t len,fi_addr_t dest_addr)250 ssize_t rxr_msg_inject(struct fid_ep *ep, const void *buf, size_t len,
251 fi_addr_t dest_addr)
252 {
253 struct rxr_ep *rxr_ep;
254 struct fi_msg msg;
255 struct iovec iov;
256
257 iov.iov_base = (void *)buf;
258 iov.iov_len = len;
259
260 memset(&msg, 0, sizeof(msg));
261 msg.msg_iov = &iov;
262 msg.iov_count = 1;
263 msg.addr = dest_addr;
264
265 rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
266 assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_msgrtm_hdr));
267
268 return rxr_msg_generic_send(ep, &msg, 0, ofi_op_msg,
269 rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION | FI_INJECT);
270 }
271
272 static
rxr_msg_injectdata(struct fid_ep * ep,const void * buf,size_t len,uint64_t data,fi_addr_t dest_addr)273 ssize_t rxr_msg_injectdata(struct fid_ep *ep, const void *buf,
274 size_t len, uint64_t data,
275 fi_addr_t dest_addr)
276 {
277 struct rxr_ep *rxr_ep;
278 struct fi_msg msg;
279 struct iovec iov;
280
281 iov.iov_base = (void *)buf;
282 iov.iov_len = len;
283
284 memset(&msg, 0, sizeof(msg));
285 msg.msg_iov = &iov;
286 msg.iov_count = 1;
287 msg.addr = dest_addr;
288 msg.data = data;
289
290 rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
291 /*
292 * We advertise the largest possible inject size with no cq data or
293 * source address. This means that we may end up not using the core
294 * providers inject for this send.
295 */
296 assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_msgrtm_hdr));
297 return rxr_msg_generic_send(ep, &msg, 0, ofi_op_msg,
298 rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION |
299 FI_REMOTE_CQ_DATA | FI_INJECT);
300 }
301
302 /**
303 * Tagged send op functions
304 */
305 static
rxr_msg_tsendmsg(struct fid_ep * ep_fid,const struct fi_msg_tagged * tmsg,uint64_t flags)306 ssize_t rxr_msg_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *tmsg,
307 uint64_t flags)
308 {
309 struct fi_msg msg;
310
311 msg.msg_iov = tmsg->msg_iov;
312 msg.desc = tmsg->desc;
313 msg.iov_count = tmsg->iov_count;
314 msg.addr = tmsg->addr;
315 msg.context = tmsg->context;
316 msg.data = tmsg->data;
317 return rxr_msg_generic_send(ep_fid, &msg, tmsg->tag, ofi_op_tagged, flags);
318 }
319
320 static
rxr_msg_tsendv(struct fid_ep * ep_fid,const struct iovec * iov,void ** desc,size_t count,fi_addr_t dest_addr,uint64_t tag,void * context)321 ssize_t rxr_msg_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,
322 void **desc, size_t count, fi_addr_t dest_addr,
323 uint64_t tag, void *context)
324 {
325 struct rxr_ep *rxr_ep;
326 struct fi_msg_tagged msg;
327
328 memset(&msg, 0, sizeof(msg));
329 msg.msg_iov = iov;
330 msg.desc = desc;
331 msg.iov_count = count;
332 msg.addr = dest_addr;
333 msg.context = context;
334 msg.tag = tag;
335
336 rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
337 return rxr_msg_tsendmsg(ep_fid, &msg, rxr_tx_flags(rxr_ep));
338 }
339
340 static
rxr_msg_tsend(struct fid_ep * ep_fid,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,uint64_t tag,void * context)341 ssize_t rxr_msg_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,
342 void *desc, fi_addr_t dest_addr, uint64_t tag,
343 void *context)
344 {
345 struct iovec msg_iov;
346
347 msg_iov.iov_base = (void *)buf;
348 msg_iov.iov_len = len;
349 return rxr_msg_tsendv(ep_fid, &msg_iov, &desc, 1, dest_addr, tag,
350 context);
351 }
352
353 static
rxr_msg_tsenddata(struct fid_ep * ep_fid,const void * buf,size_t len,void * desc,uint64_t data,fi_addr_t dest_addr,uint64_t tag,void * context)354 ssize_t rxr_msg_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len,
355 void *desc, uint64_t data, fi_addr_t dest_addr,
356 uint64_t tag, void *context)
357 {
358 struct fi_msg msg;
359 struct iovec iov;
360 struct rxr_ep *rxr_ep;
361
362 iov.iov_base = (void *)buf;
363 iov.iov_len = len;
364
365 msg.msg_iov = &iov;
366 msg.desc = desc;
367 msg.iov_count = 1;
368 msg.addr = dest_addr;
369 msg.context = context;
370 msg.data = data;
371
372 rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
373 return rxr_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
374 rxr_tx_flags(rxr_ep) | FI_REMOTE_CQ_DATA);
375 }
376
377 static
rxr_msg_tinject(struct fid_ep * ep_fid,const void * buf,size_t len,fi_addr_t dest_addr,uint64_t tag)378 ssize_t rxr_msg_tinject(struct fid_ep *ep_fid, const void *buf, size_t len,
379 fi_addr_t dest_addr, uint64_t tag)
380 {
381 struct rxr_ep *rxr_ep;
382 struct fi_msg msg;
383 struct iovec iov;
384
385 iov.iov_base = (void *)buf;
386 iov.iov_len = len;
387
388 memset(&msg, 0, sizeof(msg));
389 msg.msg_iov = &iov;
390 msg.iov_count = 1;
391 msg.addr = dest_addr;
392
393 rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
394 assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_tagrtm_hdr));
395
396 return rxr_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
397 rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION | FI_INJECT);
398 }
399
400 static
rxr_msg_tinjectdata(struct fid_ep * ep_fid,const void * buf,size_t len,uint64_t data,fi_addr_t dest_addr,uint64_t tag)401 ssize_t rxr_msg_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t len,
402 uint64_t data, fi_addr_t dest_addr, uint64_t tag)
403 {
404 struct rxr_ep *rxr_ep;
405 struct fi_msg msg;
406 struct iovec iov;
407
408 iov.iov_base = (void *)buf;
409 iov.iov_len = len;
410
411 memset(&msg, 0, sizeof(msg));
412 msg.msg_iov = &iov;
413 msg.iov_count = 1;
414 msg.addr = dest_addr;
415 msg.data = data;
416
417 rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
418 /*
419 * We advertise the largest possible inject size with no cq data or
420 * source address. This means that we may end up not using the core
421 * providers inject for this send.
422 */
423 assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_tagrtm_hdr));
424
425 return rxr_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
426 rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION |
427 FI_REMOTE_CQ_DATA | FI_INJECT);
428 }
429
430 /**
431 * Receive functions
432 */
433
434 /**
435 * Utility functions and data structures
436 */
437 struct rxr_match_info {
438 fi_addr_t addr;
439 uint64_t tag;
440 uint64_t ignore;
441 };
442
443 static
rxr_msg_match_unexp_anyaddr(struct dlist_entry * item,const void * arg)444 int rxr_msg_match_unexp_anyaddr(struct dlist_entry *item, const void *arg)
445 {
446 return 1;
447 }
448
449 static
rxr_msg_match_unexp(struct dlist_entry * item,const void * arg)450 int rxr_msg_match_unexp(struct dlist_entry *item, const void *arg)
451 {
452 const struct rxr_match_info *match_info = arg;
453 struct rxr_rx_entry *rx_entry;
454
455 rx_entry = container_of(item, struct rxr_rx_entry, entry);
456
457 return rxr_match_addr(match_info->addr, rx_entry->addr);
458 }
459
460 static
rxr_msg_match_unexp_tagged_anyaddr(struct dlist_entry * item,const void * arg)461 int rxr_msg_match_unexp_tagged_anyaddr(struct dlist_entry *item, const void *arg)
462 {
463 const struct rxr_match_info *match_info = arg;
464 struct rxr_rx_entry *rx_entry;
465
466 rx_entry = container_of(item, struct rxr_rx_entry, entry);
467
468 return rxr_match_tag(rx_entry->tag, match_info->ignore,
469 match_info->tag);
470 }
471
472 static
rxr_msg_match_unexp_tagged(struct dlist_entry * item,const void * arg)473 int rxr_msg_match_unexp_tagged(struct dlist_entry *item, const void *arg)
474 {
475 const struct rxr_match_info *match_info = arg;
476 struct rxr_rx_entry *rx_entry;
477
478 rx_entry = container_of(item, struct rxr_rx_entry, entry);
479
480 return rxr_match_addr(match_info->addr, rx_entry->addr) &&
481 rxr_match_tag(rx_entry->tag, match_info->ignore,
482 match_info->tag);
483 }
484
485 static
rxr_msg_handle_unexp_match(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,uint64_t tag,uint64_t ignore,void * context,fi_addr_t addr,uint32_t op,uint64_t flags)486 int rxr_msg_handle_unexp_match(struct rxr_ep *ep,
487 struct rxr_rx_entry *rx_entry,
488 uint64_t tag, uint64_t ignore,
489 void *context, fi_addr_t addr,
490 uint32_t op, uint64_t flags)
491 {
492 struct rxr_pkt_entry *pkt_entry;
493 uint64_t data_len;
494
495 rx_entry->fi_flags = flags;
496 rx_entry->ignore = ignore;
497 rx_entry->state = RXR_RX_MATCHED;
498
499 pkt_entry = rx_entry->unexp_pkt;
500 rx_entry->unexp_pkt = NULL;
501 data_len = rxr_pkt_rtm_total_len(pkt_entry);
502
503 rx_entry->cq_entry.op_context = context;
504 /*
505 * we don't expect recv buf from application for discard,
506 * hence setting to NULL
507 */
508 if (OFI_UNLIKELY(flags & FI_DISCARD)) {
509 rx_entry->cq_entry.buf = NULL;
510 rx_entry->cq_entry.len = data_len;
511 } else {
512 rx_entry->cq_entry.buf = rx_entry->iov[0].iov_base;
513 data_len = MIN(rx_entry->total_len,
514 ofi_total_iov_len(rx_entry->iov, rx_entry->iov_count));
515 rx_entry->cq_entry.len = data_len;
516 }
517
518 rx_entry->cq_entry.flags = (FI_RECV | FI_MSG);
519
520 if (op == ofi_op_tagged) {
521 rx_entry->cq_entry.flags |= FI_TAGGED;
522 rx_entry->cq_entry.tag = rx_entry->tag;
523 rx_entry->ignore = ignore;
524 } else {
525 rx_entry->cq_entry.tag = 0;
526 rx_entry->ignore = ~0;
527 }
528
529 return rxr_pkt_proc_matched_rtm(ep, rx_entry, pkt_entry);
530 }
531
532 /*
533 * Search unexpected list for matching message and process it if found.
534 * Returns 0 if the message is processed, -FI_ENOMSG if no match is found.
535 */
536 static
rxr_msg_proc_unexp_msg_list(struct rxr_ep * ep,const struct fi_msg * msg,uint64_t tag,uint64_t ignore,uint32_t op,uint64_t flags,struct rxr_rx_entry * posted_entry)537 int rxr_msg_proc_unexp_msg_list(struct rxr_ep *ep, const struct fi_msg *msg,
538 uint64_t tag, uint64_t ignore, uint32_t op, uint64_t flags,
539 struct rxr_rx_entry *posted_entry)
540 {
541 struct rxr_match_info match_info;
542 struct dlist_entry *match;
543 struct rxr_rx_entry *rx_entry;
544 dlist_func_t *match_func;
545 int ret;
546
547 if (op == ofi_op_tagged) {
548 if (ep->util_ep.caps & FI_DIRECTED_RECV)
549 match_func = &rxr_msg_match_unexp_tagged;
550 else
551 match_func = &rxr_msg_match_unexp_tagged_anyaddr;
552
553 match_info.addr = msg->addr;
554 match_info.tag = tag;
555 match_info.ignore = ignore;
556 match = dlist_remove_first_match(&ep->rx_unexp_tagged_list,
557 match_func,
558 (void *)&match_info);
559 } else {
560 if (ep->util_ep.caps & FI_DIRECTED_RECV)
561 match_func = &rxr_msg_match_unexp;
562 else
563 match_func = &rxr_msg_match_unexp_anyaddr;
564
565 match_info.addr = msg->addr;
566 match = dlist_remove_first_match(&ep->rx_unexp_list,
567 match_func,
568 (void *)&match_info);
569 }
570
571 if (!match)
572 return -FI_ENOMSG;
573
574 rx_entry = container_of(match, struct rxr_rx_entry, entry);
575
576 /*
577 * Initialize the matched entry as a multi-recv consumer if the posted
578 * buffer is a multi-recv buffer.
579 */
580 if (posted_entry) {
581 /*
582 * rxr_ep_split_rx_entry will setup rx_entry iov and count
583 */
584 rx_entry = rxr_ep_split_rx_entry(ep, posted_entry, rx_entry,
585 rx_entry->unexp_pkt);
586 if (OFI_UNLIKELY(!rx_entry)) {
587 FI_WARN(&rxr_prov, FI_LOG_CQ,
588 "RX entries exhausted.\n");
589 return -FI_ENOBUFS;
590 }
591 } else {
592 memcpy(rx_entry->iov, msg->msg_iov, sizeof(*rx_entry->iov) * msg->iov_count);
593 rx_entry->iov_count = msg->iov_count;
594 }
595
596 if (msg->desc)
597 memcpy(rx_entry->desc, msg->desc, sizeof(void*) * msg->iov_count);
598
599 FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
600 "Match found in unexp list for a posted recv msg_id: %" PRIu32
601 " total_len: %" PRIu64 " tag: %lx\n",
602 rx_entry->msg_id, rx_entry->total_len, rx_entry->tag);
603
604 ret = rxr_msg_handle_unexp_match(ep, rx_entry, tag, ignore,
605 msg->context, msg->addr, op, flags);
606 return ret;
607 }
608
rxr_msg_multi_recv_buffer_available(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)609 bool rxr_msg_multi_recv_buffer_available(struct rxr_ep *ep,
610 struct rxr_rx_entry *rx_entry)
611 {
612 assert(rx_entry->fi_flags & FI_MULTI_RECV);
613 assert(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED);
614
615 return (ofi_total_iov_len(rx_entry->iov, rx_entry->iov_count)
616 >= ep->min_multi_recv_size);
617 }
618
619 static inline
rxr_msg_multi_recv_buffer_complete(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)620 bool rxr_msg_multi_recv_buffer_complete(struct rxr_ep *ep,
621 struct rxr_rx_entry *rx_entry)
622 {
623 assert(rx_entry->fi_flags & FI_MULTI_RECV);
624 assert(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED);
625
626 return (!rxr_msg_multi_recv_buffer_available(ep, rx_entry) &&
627 dlist_empty(&rx_entry->multi_recv_consumers));
628 }
629
rxr_msg_multi_recv_free_posted_entry(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)630 void rxr_msg_multi_recv_free_posted_entry(struct rxr_ep *ep,
631 struct rxr_rx_entry *rx_entry)
632 {
633 assert(!(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED));
634
635 if ((rx_entry->rxr_flags & RXR_MULTI_RECV_CONSUMER) &&
636 rxr_msg_multi_recv_buffer_complete(ep, rx_entry->master_entry))
637 rxr_release_rx_entry(ep, rx_entry->master_entry);
638 }
639
640 static
rxr_msg_multi_recv(struct rxr_ep * rxr_ep,const struct fi_msg * msg,uint64_t tag,uint64_t ignore,uint32_t op,uint64_t flags)641 ssize_t rxr_msg_multi_recv(struct rxr_ep *rxr_ep, const struct fi_msg *msg,
642 uint64_t tag, uint64_t ignore, uint32_t op, uint64_t flags)
643 {
644 struct rxr_rx_entry *rx_entry;
645 int ret = 0;
646
647 if ((ofi_total_iov_len(msg->msg_iov, msg->iov_count)
648 < rxr_ep->min_multi_recv_size) || op != ofi_op_msg)
649 return -FI_EINVAL;
650
651 /*
652 * Always get new rx_entry of type RXR_MULTI_RECV_POSTED when in the
653 * multi recv path. The posted entry will not be used for receiving
654 * messages but will be used for tracking the application's buffer and
655 * when to write the completion to release the buffer.
656 */
657 rx_entry = rxr_ep_get_rx_entry(rxr_ep, msg, tag, ignore, op, flags);
658 if (OFI_UNLIKELY(!rx_entry)) {
659 rxr_ep_progress_internal(rxr_ep);
660 return -FI_EAGAIN;
661 }
662
663 rx_entry->rxr_flags |= RXR_MULTI_RECV_POSTED;
664 dlist_init(&rx_entry->multi_recv_consumers);
665 dlist_init(&rx_entry->multi_recv_entry);
666
667 while (!dlist_empty(&rxr_ep->rx_unexp_list)) {
668 ret = rxr_msg_proc_unexp_msg_list(rxr_ep, msg, tag,
669 ignore, op, flags, rx_entry);
670
671 if (!rxr_msg_multi_recv_buffer_available(rxr_ep, rx_entry)) {
672 /*
673 * Multi recv buffer consumed by short, unexp messages,
674 * free posted rx_entry.
675 */
676 if (rxr_msg_multi_recv_buffer_complete(rxr_ep, rx_entry))
677 rxr_release_rx_entry(rxr_ep, rx_entry);
678 /*
679 * Multi recv buffer has been consumed, but waiting on
680 * long msg completion. Last msg completion will free
681 * posted rx_entry.
682 */
683 if (ret == -FI_ENOMSG)
684 return 0;
685 return ret;
686 }
687
688 if (ret == -FI_ENOMSG) {
689 ret = 0;
690 break;
691 }
692
693 /*
694 * Error was encountered when processing unexpected messages,
695 * but there is buffer space available. Add the posted entry to
696 * the rx_list.
697 */
698 if (ret)
699 break;
700 }
701
702 dlist_insert_tail(&rx_entry->entry, &rxr_ep->rx_list);
703 return ret;
704 }
705
rxr_msg_multi_recv_handle_completion(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)706 void rxr_msg_multi_recv_handle_completion(struct rxr_ep *ep,
707 struct rxr_rx_entry *rx_entry)
708 {
709 assert(!(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED) &&
710 (rx_entry->rxr_flags & RXR_MULTI_RECV_CONSUMER));
711
712 dlist_remove(&rx_entry->multi_recv_entry);
713 rx_entry->rxr_flags &= ~RXR_MULTI_RECV_CONSUMER;
714
715 if (!rxr_msg_multi_recv_buffer_complete(ep, rx_entry->master_entry))
716 return;
717
718 /*
719 * Buffer is consumed and all messages have been received. Update the
720 * last message to release the application buffer.
721 */
722 rx_entry->cq_entry.flags |= FI_MULTI_RECV;
723 }
724
725 /*
726 * create a rx entry and verify in unexpected message list
727 * else add to posted recv list
728 */
729 static
rxr_msg_generic_recv(struct fid_ep * ep,const struct fi_msg * msg,uint64_t tag,uint64_t ignore,uint32_t op,uint64_t flags)730 ssize_t rxr_msg_generic_recv(struct fid_ep *ep, const struct fi_msg *msg,
731 uint64_t tag, uint64_t ignore, uint32_t op,
732 uint64_t flags)
733 {
734 ssize_t ret = 0;
735 struct rxr_ep *rxr_ep;
736 struct dlist_entry *unexp_list;
737 struct rxr_rx_entry *rx_entry;
738 uint64_t rx_op_flags;
739
740 FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
741 "%s: iov_len: %lu tag: %lx ignore: %lx op: %x flags: %lx\n",
742 __func__, ofi_total_iov_len(msg->msg_iov, msg->iov_count), tag, ignore,
743 op, flags);
744
745 rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
746
747 assert(msg->iov_count <= rxr_ep->rx_iov_limit);
748
749 rxr_perfset_start(rxr_ep, perf_rxr_recv);
750
751 assert(rxr_ep->util_ep.rx_msg_flags == 0 || rxr_ep->util_ep.rx_msg_flags == FI_COMPLETION);
752 rx_op_flags = rxr_ep->util_ep.rx_op_flags;
753 if (rxr_ep->util_ep.rx_msg_flags == 0)
754 rx_op_flags &= ~FI_COMPLETION;
755 flags = flags | rx_op_flags;
756
757 fastlock_acquire(&rxr_ep->util_ep.lock);
758 if (OFI_UNLIKELY(is_rx_res_full(rxr_ep))) {
759 ret = -FI_EAGAIN;
760 goto out;
761 }
762
763 if (flags & FI_MULTI_RECV) {
764 ret = rxr_msg_multi_recv(rxr_ep, msg, tag, ignore, op, flags);
765 goto out;
766 }
767
768 unexp_list = (op == ofi_op_tagged) ? &rxr_ep->rx_unexp_tagged_list :
769 &rxr_ep->rx_unexp_list;
770
771 if (!dlist_empty(unexp_list)) {
772 ret = rxr_msg_proc_unexp_msg_list(rxr_ep, msg, tag,
773 ignore, op, flags, NULL);
774
775 if (ret != -FI_ENOMSG)
776 goto out;
777 ret = 0;
778 }
779
780 rx_entry = rxr_ep_get_rx_entry(rxr_ep, msg, tag,
781 ignore, op, flags);
782
783 if (OFI_UNLIKELY(!rx_entry)) {
784 ret = -FI_EAGAIN;
785 rxr_ep_progress_internal(rxr_ep);
786 goto out;
787 }
788
789 if (op == ofi_op_tagged)
790 dlist_insert_tail(&rx_entry->entry, &rxr_ep->rx_tagged_list);
791 else
792 dlist_insert_tail(&rx_entry->entry, &rxr_ep->rx_list);
793
794 out:
795 fastlock_release(&rxr_ep->util_ep.lock);
796
797 rxr_perfset_end(rxr_ep, perf_rxr_recv);
798 return ret;
799 }
800
801 static
rxr_msg_discard_trecv(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,const struct fi_msg_tagged * msg,int64_t flags)802 ssize_t rxr_msg_discard_trecv(struct rxr_ep *ep,
803 struct rxr_rx_entry *rx_entry,
804 const struct fi_msg_tagged *msg,
805 int64_t flags)
806 {
807 int ret;
808
809 if ((flags & FI_DISCARD) && !(flags & (FI_PEEK | FI_CLAIM)))
810 return -FI_EINVAL;
811
812 rx_entry->fi_flags |= FI_DISCARD;
813 rx_entry->rxr_flags |= RXR_RECV_CANCEL;
814 ret = ofi_cq_write(ep->util_ep.rx_cq, msg->context,
815 FI_TAGGED | FI_RECV | FI_MSG,
816 0, NULL, rx_entry->cq_entry.data,
817 rx_entry->cq_entry.tag);
818 rxr_rm_rx_cq_check(ep, ep->util_ep.rx_cq);
819 return ret;
820 }
821
822 static
rxr_msg_claim_trecv(struct fid_ep * ep_fid,const struct fi_msg_tagged * msg,int64_t flags)823 ssize_t rxr_msg_claim_trecv(struct fid_ep *ep_fid,
824 const struct fi_msg_tagged *msg,
825 int64_t flags)
826 {
827 ssize_t ret = 0;
828 struct rxr_ep *ep;
829 struct rxr_rx_entry *rx_entry;
830 struct fi_context *context;
831
832 ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
833 fastlock_acquire(&ep->util_ep.lock);
834
835 context = (struct fi_context *)msg->context;
836 rx_entry = (struct rxr_rx_entry *)context->internal[0];
837
838 if (flags & FI_DISCARD) {
839 ret = rxr_msg_discard_trecv(ep, rx_entry, msg, flags);
840 if (OFI_UNLIKELY(ret))
841 goto out;
842 }
843
844 /*
845 * Handle unexp match entry even for discard entry as we are sinking
846 * messages for that case
847 */
848 memcpy(rx_entry->iov, msg->msg_iov,
849 sizeof(*msg->msg_iov) * msg->iov_count);
850 rx_entry->iov_count = msg->iov_count;
851
852 ret = rxr_msg_handle_unexp_match(ep, rx_entry, msg->tag,
853 msg->ignore, msg->context,
854 msg->addr, ofi_op_tagged, flags);
855
856 out:
857 fastlock_release(&ep->util_ep.lock);
858 return ret;
859 }
860
861 static
rxr_msg_peek_trecv(struct fid_ep * ep_fid,const struct fi_msg_tagged * msg,uint64_t flags)862 ssize_t rxr_msg_peek_trecv(struct fid_ep *ep_fid,
863 const struct fi_msg_tagged *msg,
864 uint64_t flags)
865 {
866 ssize_t ret = 0;
867 struct rxr_ep *ep;
868 struct dlist_entry *match;
869 dlist_func_t *match_func;
870 struct rxr_match_info match_info;
871 struct rxr_rx_entry *rx_entry;
872 struct fi_context *context;
873 struct rxr_pkt_entry *pkt_entry;
874 size_t data_len;
875 int64_t tag;
876
877 ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
878
879 fastlock_acquire(&ep->util_ep.lock);
880
881 rxr_ep_progress_internal(ep);
882 match_info.addr = msg->addr;
883 match_info.tag = msg->tag;
884 match_info.ignore = msg->ignore;
885
886 if (ep->util_ep.caps & FI_DIRECTED_RECV)
887 match_func = &rxr_msg_match_unexp_tagged;
888 else
889 match_func = &rxr_msg_match_unexp_tagged_anyaddr;
890
891 match = dlist_find_first_match(&ep->rx_unexp_tagged_list,
892 match_func,
893 (void *)&match_info);
894 if (!match) {
895 FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
896 "Message not found addr: %" PRIu64
897 " tag: %lx ignore %lx\n", msg->addr, msg->tag,
898 msg->ignore);
899 ret = ofi_cq_write_error_peek(ep->util_ep.rx_cq, msg->tag,
900 msg->context);
901 goto out;
902 }
903
904 rx_entry = container_of(match, struct rxr_rx_entry, entry);
905 context = (struct fi_context *)msg->context;
906 if (flags & FI_CLAIM) {
907 context->internal[0] = rx_entry;
908 dlist_remove(match);
909 } else if (flags & FI_DISCARD) {
910 dlist_remove(match);
911
912 ret = rxr_msg_discard_trecv(ep, rx_entry, msg, flags);
913 if (ret)
914 goto out;
915
916 memcpy(rx_entry->iov, msg->msg_iov,
917 sizeof(*msg->msg_iov) * msg->iov_count);
918 rx_entry->iov_count = msg->iov_count;
919
920 ret = rxr_msg_handle_unexp_match(ep, rx_entry,
921 msg->tag, msg->ignore,
922 msg->context, msg->addr,
923 ofi_op_tagged, flags);
924
925 goto out;
926 }
927
928 pkt_entry = rx_entry->unexp_pkt;
929 data_len = rxr_pkt_rtm_total_len(pkt_entry);
930 tag = rxr_pkt_rtm_tag(pkt_entry);
931
932 if (ep->util_ep.caps & FI_SOURCE)
933 ret = ofi_cq_write_src(ep->util_ep.rx_cq, context,
934 FI_TAGGED | FI_RECV,
935 data_len, NULL,
936 rx_entry->cq_entry.data, tag,
937 rx_entry->addr);
938 else
939 ret = ofi_cq_write(ep->util_ep.rx_cq, context,
940 FI_TAGGED | FI_RECV,
941 data_len, NULL,
942 rx_entry->cq_entry.data, tag);
943 rxr_rm_rx_cq_check(ep, ep->util_ep.rx_cq);
944 out:
945 fastlock_release(&ep->util_ep.lock);
946 return ret;
947 }
948
949 /**
950 * Non-tagged receive ops
951 */
952 static
rxr_msg_recvmsg(struct fid_ep * ep_fid,const struct fi_msg * msg,uint64_t flags)953 ssize_t rxr_msg_recvmsg(struct fid_ep *ep_fid, const struct fi_msg *msg,
954 uint64_t flags)
955 {
956 return rxr_msg_generic_recv(ep_fid, msg, 0, 0, ofi_op_msg, flags);
957 }
958
959 static
rxr_msg_recv(struct fid_ep * ep,void * buf,size_t len,void * desc,fi_addr_t src_addr,void * context)960 ssize_t rxr_msg_recv(struct fid_ep *ep, void *buf, size_t len,
961 void *desc, fi_addr_t src_addr, void *context)
962 {
963 struct fi_msg msg;
964 struct iovec msg_iov;
965
966 memset(&msg, 0, sizeof(msg));
967 msg_iov.iov_base = buf;
968 msg_iov.iov_len = len;
969
970 msg.msg_iov = &msg_iov;
971 msg.desc = &desc;
972 msg.iov_count = 1;
973 msg.addr = src_addr;
974 msg.context = context;
975 msg.data = 0;
976
977 return rxr_msg_recvmsg(ep, &msg, 0);
978 }
979
980 static
rxr_msg_recvv(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t src_addr,void * context)981 ssize_t rxr_msg_recvv(struct fid_ep *ep, const struct iovec *iov,
982 void **desc, size_t count, fi_addr_t src_addr,
983 void *context)
984 {
985 struct fi_msg msg;
986
987 memset(&msg, 0, sizeof(msg));
988 msg.msg_iov = iov;
989 msg.desc = desc;
990 msg.iov_count = count;
991 msg.addr = src_addr;
992 msg.context = context;
993 msg.data = 0;
994
995 return rxr_msg_recvmsg(ep, &msg, 0);
996 }
997
998 /**
999 * Tagged receive ops functions
1000 */
1001 static
rxr_msg_trecv(struct fid_ep * ep_fid,void * buf,size_t len,void * desc,fi_addr_t src_addr,uint64_t tag,uint64_t ignore,void * context)1002 ssize_t rxr_msg_trecv(struct fid_ep *ep_fid, void *buf, size_t len, void *desc,
1003 fi_addr_t src_addr, uint64_t tag, uint64_t ignore,
1004 void *context)
1005 {
1006 struct fi_msg msg;
1007 struct iovec msg_iov;
1008
1009 msg_iov.iov_base = (void *)buf;
1010 msg_iov.iov_len = len;
1011
1012 msg.msg_iov = &msg_iov;
1013 msg.iov_count = 1;
1014 msg.addr = src_addr;
1015 msg.context = context;
1016 msg.desc = &desc;
1017
1018 return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, 0);
1019 }
1020
1021 static
rxr_msg_trecvv(struct fid_ep * ep_fid,const struct iovec * iov,void ** desc,size_t count,fi_addr_t src_addr,uint64_t tag,uint64_t ignore,void * context)1022 ssize_t rxr_msg_trecvv(struct fid_ep *ep_fid, const struct iovec *iov,
1023 void **desc, size_t count, fi_addr_t src_addr,
1024 uint64_t tag, uint64_t ignore, void *context)
1025 {
1026 struct fi_msg msg;
1027
1028 msg.msg_iov = iov;
1029 msg.iov_count = count;
1030 msg.addr = src_addr;
1031 msg.desc = desc;
1032 msg.context = context;
1033
1034 return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, 0);
1035 }
1036
1037 static
rxr_msg_trecvmsg(struct fid_ep * ep_fid,const struct fi_msg_tagged * tagmsg,uint64_t flags)1038 ssize_t rxr_msg_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *tagmsg,
1039 uint64_t flags)
1040 {
1041 ssize_t ret;
1042 struct fi_msg msg;
1043
1044 if (flags & FI_PEEK) {
1045 ret = rxr_msg_peek_trecv(ep_fid, tagmsg, flags);
1046 goto out;
1047 } else if (flags & FI_CLAIM) {
1048 ret = rxr_msg_claim_trecv(ep_fid, tagmsg, flags);
1049 goto out;
1050 }
1051
1052 msg.msg_iov = tagmsg->msg_iov;
1053 msg.iov_count = tagmsg->iov_count;
1054 msg.addr = tagmsg->addr;
1055 msg.desc = tagmsg->desc;
1056 msg.context = tagmsg->context;
1057
1058 ret = rxr_msg_generic_recv(ep_fid, &msg, tagmsg->tag, tagmsg->ignore,
1059 ofi_op_tagged, flags);
1060
1061 out:
1062 return ret;
1063 }
1064
1065 /**
1066 * Ops structures used by rxr_endpoint()
1067 */
1068 struct fi_ops_msg rxr_ops_msg = {
1069 .size = sizeof(struct fi_ops_msg),
1070 .send = rxr_msg_send,
1071 .sendv = rxr_msg_sendv,
1072 .sendmsg = rxr_msg_sendmsg,
1073 .senddata = rxr_msg_senddata,
1074 .inject = rxr_msg_inject,
1075 .injectdata = rxr_msg_injectdata,
1076 .recv = rxr_msg_recv,
1077 .recvv = rxr_msg_recvv,
1078 .recvmsg = rxr_msg_recvmsg,
1079 };
1080
1081 struct fi_ops_tagged rxr_ops_tagged = {
1082 .size = sizeof(struct fi_ops_tagged),
1083 .send = rxr_msg_tsend,
1084 .sendv = rxr_msg_tsendv,
1085 .sendmsg = rxr_msg_tsendmsg,
1086 .senddata = rxr_msg_tsenddata,
1087 .inject = rxr_msg_tinject,
1088 .injectdata = rxr_msg_tinjectdata,
1089 .recv = rxr_msg_trecv,
1090 .recvv = rxr_msg_trecvv,
1091 .recvmsg = rxr_msg_trecvmsg,
1092 };
1093
1094