1 /*
2  * Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates.
3  * All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include <inttypes.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include "ofi.h"
38 #include <ofi_util.h>
39 #include <ofi_iov.h>
40 
41 #include "efa.h"
42 #include "rxr.h"
43 #include "rxr_msg.h"
44 #include "rxr_pkt_cmd.h"
45 
46 /**
47  * This file define the msg ops functions.
48  * It is consisted of the following sections:
49  *     send functions,
50  *     receive functions and
51  *     ops structure
52  */
53 
54 /**
55  *  Send function
56  */
57 
58 /**
59  *   Utility functions used by both non-tagged and tagged send.
60  */
rxr_msg_post_rtm(struct rxr_ep * rxr_ep,struct rxr_tx_entry * tx_entry)61 ssize_t rxr_msg_post_rtm(struct rxr_ep *rxr_ep, struct rxr_tx_entry *tx_entry)
62 {
63 	/*
64 	 * For performance consideration, this function assume the tagged rtm packet type id is
65 	 * always the correspondent message rtm packet type id + 1, thus the assertion here.
66 	 */
67 	assert(RXR_EAGER_MSGRTM_PKT + 1 == RXR_EAGER_TAGRTM_PKT);
68 	assert(RXR_READ_MSGRTM_PKT + 1 == RXR_READ_TAGRTM_PKT);
69 	assert(RXR_LONG_MSGRTM_PKT + 1 == RXR_LONG_TAGRTM_PKT);
70 	assert(RXR_MEDIUM_MSGRTM_PKT + 1 == RXR_MEDIUM_TAGRTM_PKT);
71 
72 	int tagged;
73 	size_t max_rtm_data_size;
74 	ssize_t err;
75 	struct rxr_peer *peer;
76 
77 	assert(tx_entry->op == ofi_op_msg || tx_entry->op == ofi_op_tagged);
78 	tagged = (tx_entry->op == ofi_op_tagged);
79 	assert(tagged == 0 || tagged == 1);
80 
81 	max_rtm_data_size = rxr_pkt_req_max_data_size(rxr_ep,
82 						      tx_entry->addr,
83 						      RXR_EAGER_MSGRTM_PKT + tagged);
84 
85 	peer = rxr_ep_get_peer(rxr_ep, tx_entry->addr);
86 
87 	if (peer->is_local) {
88 		assert(rxr_ep->use_shm);
89 		/* intra instance message */
90 		int rtm_type = (tx_entry->total_len <= max_rtm_data_size) ? RXR_EAGER_MSGRTM_PKT
91 									  : RXR_READ_MSGRTM_PKT;
92 
93 		return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry, rtm_type + tagged, 0);
94 	}
95 
96 	/* inter instance message */
97 	if (tx_entry->total_len <= max_rtm_data_size)
98 		return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
99 						  RXR_EAGER_MSGRTM_PKT + tagged, 0);
100 
101 	if (tx_entry->total_len <= rxr_env.efa_max_medium_msg_size) {
102 		/* we do not check the return value of rxr_ep_init_mr_desc()
103 		 * because medium message works even if MR registration failed
104 		 */
105 		if (efa_mr_cache_enable)
106 			rxr_ep_tx_init_mr_desc(rxr_ep_domain(rxr_ep),
107 					       tx_entry, 0, FI_SEND);
108 		return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
109 						  RXR_MEDIUM_MSGRTM_PKT + tagged, 0);
110 	}
111 
112 	if (efa_both_support_rdma_read(rxr_ep, peer) &&
113 	    (tx_entry->desc[0] || efa_mr_cache_enable)) {
114 		/* use read message protocol */
115 		err = rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
116 						 RXR_READ_MSGRTM_PKT + tagged, 0);
117 
118 		if (err != -FI_ENOMEM)
119 			return err;
120 
121 		/*
122 		 * If memory registration failed, we continue here
123 		 * and fall back to use long message protocol
124 		 */
125 	}
126 
127 	err = rxr_ep_set_tx_credit_request(rxr_ep, tx_entry);
128 	if (OFI_UNLIKELY(err))
129 		return err;
130 
131 	return rxr_pkt_post_ctrl_or_queue(rxr_ep, RXR_TX_ENTRY, tx_entry,
132 					  RXR_LONG_MSGRTM_PKT + tagged, 0);
133 }
134 
rxr_msg_generic_send(struct fid_ep * ep,const struct fi_msg * msg,uint64_t tag,uint32_t op,uint64_t flags)135 ssize_t rxr_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg,
136 			     uint64_t tag, uint32_t op, uint64_t flags)
137 {
138 	struct rxr_ep *rxr_ep;
139 	ssize_t err;
140 	struct rxr_tx_entry *tx_entry;
141 	struct rxr_peer *peer;
142 
143 	FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
144 	       "iov_len: %lu tag: %lx op: %x flags: %lx\n",
145 	       ofi_total_iov_len(msg->msg_iov, msg->iov_count),
146 	       tag, op, flags);
147 
148 	rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
149 	assert(msg->iov_count <= rxr_ep->tx_iov_limit);
150 
151 	rxr_perfset_start(rxr_ep, perf_rxr_tx);
152 	fastlock_acquire(&rxr_ep->util_ep.lock);
153 
154 	if (OFI_UNLIKELY(is_tx_res_full(rxr_ep))) {
155 		err = -FI_EAGAIN;
156 		goto out;
157 	}
158 
159 	tx_entry = rxr_ep_alloc_tx_entry(rxr_ep, msg, op, tag, flags);
160 
161 	if (OFI_UNLIKELY(!tx_entry)) {
162 		err = -FI_EAGAIN;
163 		rxr_ep_progress_internal(rxr_ep);
164 		goto out;
165 	}
166 
167 	assert(tx_entry->op == ofi_op_msg || tx_entry->op == ofi_op_tagged);
168 
169 	peer = rxr_ep_get_peer(rxr_ep, tx_entry->addr);
170 	assert(peer);
171 	tx_entry->msg_id = peer->next_msg_id++;
172 	err = rxr_msg_post_rtm(rxr_ep, tx_entry);
173 	if (OFI_UNLIKELY(err)) {
174 		rxr_release_tx_entry(rxr_ep, tx_entry);
175 		peer->next_msg_id--;
176 	}
177 
178 out:
179 	fastlock_release(&rxr_ep->util_ep.lock);
180 	rxr_perfset_end(rxr_ep, perf_rxr_tx);
181 	return err;
182 }
183 
184 /**
185  *   Non-tagged send ops function
186  */
187 static
rxr_msg_sendmsg(struct fid_ep * ep,const struct fi_msg * msg,uint64_t flags)188 ssize_t rxr_msg_sendmsg(struct fid_ep *ep, const struct fi_msg *msg,
189 			uint64_t flags)
190 {
191 	return rxr_msg_generic_send(ep, msg, 0, ofi_op_msg, flags);
192 }
193 
194 static
rxr_msg_sendv(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t dest_addr,void * context)195 ssize_t rxr_msg_sendv(struct fid_ep *ep, const struct iovec *iov,
196 		      void **desc, size_t count, fi_addr_t dest_addr,
197 		      void *context)
198 {
199 	struct rxr_ep *rxr_ep;
200 	struct fi_msg msg;
201 
202 	memset(&msg, 0, sizeof(msg));
203 	msg.msg_iov = iov;
204 	msg.desc = desc;
205 	msg.iov_count = count;
206 	msg.addr = dest_addr;
207 	msg.context = context;
208 
209 	rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
210 	return rxr_msg_sendmsg(ep, &msg, rxr_tx_flags(rxr_ep));
211 }
212 
213 static
rxr_msg_send(struct fid_ep * ep,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,void * context)214 ssize_t rxr_msg_send(struct fid_ep *ep, const void *buf, size_t len,
215 		     void *desc, fi_addr_t dest_addr, void *context)
216 {
217 	struct iovec iov;
218 
219 	iov.iov_base = (void *)buf;
220 	iov.iov_len = len;
221 	return rxr_msg_sendv(ep, &iov, desc, 1, dest_addr, context);
222 }
223 
224 static
rxr_msg_senddata(struct fid_ep * ep,const void * buf,size_t len,void * desc,uint64_t data,fi_addr_t dest_addr,void * context)225 ssize_t rxr_msg_senddata(struct fid_ep *ep, const void *buf, size_t len,
226 			 void *desc, uint64_t data, fi_addr_t dest_addr,
227 			 void *context)
228 {
229 	struct fi_msg msg;
230 	struct iovec iov;
231 	struct rxr_ep *rxr_ep;
232 
233 	iov.iov_base = (void *)buf;
234 	iov.iov_len = len;
235 
236 	memset(&msg, 0, sizeof(msg));
237 	msg.msg_iov = &iov;
238 	msg.desc = desc;
239 	msg.iov_count = 1;
240 	msg.addr = dest_addr;
241 	msg.context = context;
242 	msg.data = data;
243 
244 	rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
245 	return rxr_msg_generic_send(ep, &msg, 0, ofi_op_msg,
246 				    rxr_tx_flags(rxr_ep) | FI_REMOTE_CQ_DATA);
247 }
248 
249 static
rxr_msg_inject(struct fid_ep * ep,const void * buf,size_t len,fi_addr_t dest_addr)250 ssize_t rxr_msg_inject(struct fid_ep *ep, const void *buf, size_t len,
251 		       fi_addr_t dest_addr)
252 {
253 	struct rxr_ep *rxr_ep;
254 	struct fi_msg msg;
255 	struct iovec iov;
256 
257 	iov.iov_base = (void *)buf;
258 	iov.iov_len = len;
259 
260 	memset(&msg, 0, sizeof(msg));
261 	msg.msg_iov = &iov;
262 	msg.iov_count = 1;
263 	msg.addr = dest_addr;
264 
265 	rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
266 	assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_msgrtm_hdr));
267 
268 	return rxr_msg_generic_send(ep, &msg, 0, ofi_op_msg,
269 				    rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION | FI_INJECT);
270 }
271 
272 static
rxr_msg_injectdata(struct fid_ep * ep,const void * buf,size_t len,uint64_t data,fi_addr_t dest_addr)273 ssize_t rxr_msg_injectdata(struct fid_ep *ep, const void *buf,
274 			   size_t len, uint64_t data,
275 			   fi_addr_t dest_addr)
276 {
277 	struct rxr_ep *rxr_ep;
278 	struct fi_msg msg;
279 	struct iovec iov;
280 
281 	iov.iov_base = (void *)buf;
282 	iov.iov_len = len;
283 
284 	memset(&msg, 0, sizeof(msg));
285 	msg.msg_iov = &iov;
286 	msg.iov_count = 1;
287 	msg.addr = dest_addr;
288 	msg.data = data;
289 
290 	rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
291 	/*
292 	 * We advertise the largest possible inject size with no cq data or
293 	 * source address. This means that we may end up not using the core
294 	 * providers inject for this send.
295 	 */
296 	assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_msgrtm_hdr));
297 	return rxr_msg_generic_send(ep, &msg, 0, ofi_op_msg,
298 				    rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION |
299 				    FI_REMOTE_CQ_DATA | FI_INJECT);
300 }
301 
302 /**
303  *   Tagged send op functions
304  */
305 static
rxr_msg_tsendmsg(struct fid_ep * ep_fid,const struct fi_msg_tagged * tmsg,uint64_t flags)306 ssize_t rxr_msg_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *tmsg,
307 			 uint64_t flags)
308 {
309 	struct fi_msg msg;
310 
311 	msg.msg_iov = tmsg->msg_iov;
312 	msg.desc = tmsg->desc;
313 	msg.iov_count = tmsg->iov_count;
314 	msg.addr = tmsg->addr;
315 	msg.context = tmsg->context;
316 	msg.data = tmsg->data;
317 	return rxr_msg_generic_send(ep_fid, &msg, tmsg->tag, ofi_op_tagged, flags);
318 }
319 
320 static
rxr_msg_tsendv(struct fid_ep * ep_fid,const struct iovec * iov,void ** desc,size_t count,fi_addr_t dest_addr,uint64_t tag,void * context)321 ssize_t rxr_msg_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,
322 		       void **desc, size_t count, fi_addr_t dest_addr,
323 		       uint64_t tag, void *context)
324 {
325 	struct rxr_ep *rxr_ep;
326 	struct fi_msg_tagged msg;
327 
328 	memset(&msg, 0, sizeof(msg));
329 	msg.msg_iov = iov;
330 	msg.desc = desc;
331 	msg.iov_count = count;
332 	msg.addr = dest_addr;
333 	msg.context = context;
334 	msg.tag = tag;
335 
336 	rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
337 	return rxr_msg_tsendmsg(ep_fid, &msg, rxr_tx_flags(rxr_ep));
338 }
339 
340 static
rxr_msg_tsend(struct fid_ep * ep_fid,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,uint64_t tag,void * context)341 ssize_t rxr_msg_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,
342 		      void *desc, fi_addr_t dest_addr, uint64_t tag,
343 		      void *context)
344 {
345 	struct iovec msg_iov;
346 
347 	msg_iov.iov_base = (void *)buf;
348 	msg_iov.iov_len = len;
349 	return rxr_msg_tsendv(ep_fid, &msg_iov, &desc, 1, dest_addr, tag,
350 			      context);
351 }
352 
353 static
rxr_msg_tsenddata(struct fid_ep * ep_fid,const void * buf,size_t len,void * desc,uint64_t data,fi_addr_t dest_addr,uint64_t tag,void * context)354 ssize_t rxr_msg_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len,
355 			  void *desc, uint64_t data, fi_addr_t dest_addr,
356 			  uint64_t tag, void *context)
357 {
358 	struct fi_msg msg;
359 	struct iovec iov;
360 	struct rxr_ep *rxr_ep;
361 
362 	iov.iov_base = (void *)buf;
363 	iov.iov_len = len;
364 
365 	msg.msg_iov = &iov;
366 	msg.desc = desc;
367 	msg.iov_count = 1;
368 	msg.addr = dest_addr;
369 	msg.context = context;
370 	msg.data = data;
371 
372 	rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
373 	return rxr_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
374 				    rxr_tx_flags(rxr_ep) | FI_REMOTE_CQ_DATA);
375 }
376 
377 static
rxr_msg_tinject(struct fid_ep * ep_fid,const void * buf,size_t len,fi_addr_t dest_addr,uint64_t tag)378 ssize_t rxr_msg_tinject(struct fid_ep *ep_fid, const void *buf, size_t len,
379 			fi_addr_t dest_addr, uint64_t tag)
380 {
381 	struct rxr_ep *rxr_ep;
382 	struct fi_msg msg;
383 	struct iovec iov;
384 
385 	iov.iov_base = (void *)buf;
386 	iov.iov_len = len;
387 
388 	memset(&msg, 0, sizeof(msg));
389 	msg.msg_iov = &iov;
390 	msg.iov_count = 1;
391 	msg.addr = dest_addr;
392 
393 	rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
394 	assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_tagrtm_hdr));
395 
396 	return rxr_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
397 				    rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION | FI_INJECT);
398 }
399 
400 static
rxr_msg_tinjectdata(struct fid_ep * ep_fid,const void * buf,size_t len,uint64_t data,fi_addr_t dest_addr,uint64_t tag)401 ssize_t rxr_msg_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t len,
402 			    uint64_t data, fi_addr_t dest_addr, uint64_t tag)
403 {
404 	struct rxr_ep *rxr_ep;
405 	struct fi_msg msg;
406 	struct iovec iov;
407 
408 	iov.iov_base = (void *)buf;
409 	iov.iov_len = len;
410 
411 	memset(&msg, 0, sizeof(msg));
412 	msg.msg_iov = &iov;
413 	msg.iov_count = 1;
414 	msg.addr = dest_addr;
415 	msg.data = data;
416 
417 	rxr_ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
418 	/*
419 	 * We advertise the largest possible inject size with no cq data or
420 	 * source address. This means that we may end up not using the core
421 	 * providers inject for this send.
422 	 */
423 	assert(len <= rxr_ep->core_inject_size - sizeof(struct rxr_eager_tagrtm_hdr));
424 
425 	return rxr_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
426 				    rxr_tx_flags(rxr_ep) | RXR_NO_COMPLETION |
427 				    FI_REMOTE_CQ_DATA | FI_INJECT);
428 }
429 
430 /**
431  *  Receive functions
432  */
433 
434 /**
435  *   Utility functions and data structures
436  */
437 struct rxr_match_info {
438 	fi_addr_t addr;
439 	uint64_t tag;
440 	uint64_t ignore;
441 };
442 
443 static
rxr_msg_match_unexp_anyaddr(struct dlist_entry * item,const void * arg)444 int rxr_msg_match_unexp_anyaddr(struct dlist_entry *item, const void *arg)
445 {
446 	return 1;
447 }
448 
449 static
rxr_msg_match_unexp(struct dlist_entry * item,const void * arg)450 int rxr_msg_match_unexp(struct dlist_entry *item, const void *arg)
451 {
452 	const struct rxr_match_info *match_info = arg;
453 	struct rxr_rx_entry *rx_entry;
454 
455 	rx_entry = container_of(item, struct rxr_rx_entry, entry);
456 
457 	return rxr_match_addr(match_info->addr, rx_entry->addr);
458 }
459 
460 static
rxr_msg_match_unexp_tagged_anyaddr(struct dlist_entry * item,const void * arg)461 int rxr_msg_match_unexp_tagged_anyaddr(struct dlist_entry *item, const void *arg)
462 {
463 	const struct rxr_match_info *match_info = arg;
464 	struct rxr_rx_entry *rx_entry;
465 
466 	rx_entry = container_of(item, struct rxr_rx_entry, entry);
467 
468 	return rxr_match_tag(rx_entry->tag, match_info->ignore,
469 			     match_info->tag);
470 }
471 
472 static
rxr_msg_match_unexp_tagged(struct dlist_entry * item,const void * arg)473 int rxr_msg_match_unexp_tagged(struct dlist_entry *item, const void *arg)
474 {
475 	const struct rxr_match_info *match_info = arg;
476 	struct rxr_rx_entry *rx_entry;
477 
478 	rx_entry = container_of(item, struct rxr_rx_entry, entry);
479 
480 	return rxr_match_addr(match_info->addr, rx_entry->addr) &&
481 	       rxr_match_tag(rx_entry->tag, match_info->ignore,
482 			     match_info->tag);
483 }
484 
485 static
rxr_msg_handle_unexp_match(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,uint64_t tag,uint64_t ignore,void * context,fi_addr_t addr,uint32_t op,uint64_t flags)486 int rxr_msg_handle_unexp_match(struct rxr_ep *ep,
487 			       struct rxr_rx_entry *rx_entry,
488 			       uint64_t tag, uint64_t ignore,
489 			       void *context, fi_addr_t addr,
490 			       uint32_t op, uint64_t flags)
491 {
492 	struct rxr_pkt_entry *pkt_entry;
493 	uint64_t data_len;
494 
495 	rx_entry->fi_flags = flags;
496 	rx_entry->ignore = ignore;
497 	rx_entry->state = RXR_RX_MATCHED;
498 
499 	pkt_entry = rx_entry->unexp_pkt;
500 	rx_entry->unexp_pkt = NULL;
501 	data_len = rxr_pkt_rtm_total_len(pkt_entry);
502 
503 	rx_entry->cq_entry.op_context = context;
504 	/*
505 	 * we don't expect recv buf from application for discard,
506 	 * hence setting to NULL
507 	 */
508 	if (OFI_UNLIKELY(flags & FI_DISCARD)) {
509 		rx_entry->cq_entry.buf = NULL;
510 		rx_entry->cq_entry.len = data_len;
511 	} else {
512 		rx_entry->cq_entry.buf = rx_entry->iov[0].iov_base;
513 		data_len = MIN(rx_entry->total_len,
514 			       ofi_total_iov_len(rx_entry->iov, rx_entry->iov_count));
515 		rx_entry->cq_entry.len = data_len;
516 	}
517 
518 	rx_entry->cq_entry.flags = (FI_RECV | FI_MSG);
519 
520 	if (op == ofi_op_tagged) {
521 		rx_entry->cq_entry.flags |= FI_TAGGED;
522 		rx_entry->cq_entry.tag = rx_entry->tag;
523 		rx_entry->ignore = ignore;
524 	} else {
525 		rx_entry->cq_entry.tag = 0;
526 		rx_entry->ignore = ~0;
527 	}
528 
529 	return rxr_pkt_proc_matched_rtm(ep, rx_entry, pkt_entry);
530 }
531 
532 /*
533  *    Search unexpected list for matching message and process it if found.
534  *    Returns 0 if the message is processed, -FI_ENOMSG if no match is found.
535  */
536 static
rxr_msg_proc_unexp_msg_list(struct rxr_ep * ep,const struct fi_msg * msg,uint64_t tag,uint64_t ignore,uint32_t op,uint64_t flags,struct rxr_rx_entry * posted_entry)537 int rxr_msg_proc_unexp_msg_list(struct rxr_ep *ep, const struct fi_msg *msg,
538 				uint64_t tag, uint64_t ignore, uint32_t op, uint64_t flags,
539 				struct rxr_rx_entry *posted_entry)
540 {
541 	struct rxr_match_info match_info;
542 	struct dlist_entry *match;
543 	struct rxr_rx_entry *rx_entry;
544 	dlist_func_t *match_func;
545 	int ret;
546 
547 	if (op == ofi_op_tagged) {
548 		if (ep->util_ep.caps & FI_DIRECTED_RECV)
549 			match_func = &rxr_msg_match_unexp_tagged;
550 		else
551 			match_func = &rxr_msg_match_unexp_tagged_anyaddr;
552 
553 		match_info.addr = msg->addr;
554 		match_info.tag = tag;
555 		match_info.ignore = ignore;
556 		match = dlist_remove_first_match(&ep->rx_unexp_tagged_list,
557 		                                 match_func,
558 						 (void *)&match_info);
559 	} else {
560 		if (ep->util_ep.caps & FI_DIRECTED_RECV)
561 			match_func = &rxr_msg_match_unexp;
562 		else
563 			match_func = &rxr_msg_match_unexp_anyaddr;
564 
565 		match_info.addr = msg->addr;
566 		match = dlist_remove_first_match(&ep->rx_unexp_list,
567 		                                 match_func,
568 						 (void *)&match_info);
569 	}
570 
571 	if (!match)
572 		return -FI_ENOMSG;
573 
574 	rx_entry = container_of(match, struct rxr_rx_entry, entry);
575 
576 	/*
577 	 * Initialize the matched entry as a multi-recv consumer if the posted
578 	 * buffer is a multi-recv buffer.
579 	 */
580 	if (posted_entry) {
581 		/*
582 		 * rxr_ep_split_rx_entry will setup rx_entry iov and count
583 		 */
584 		rx_entry = rxr_ep_split_rx_entry(ep, posted_entry, rx_entry,
585 						 rx_entry->unexp_pkt);
586 		if (OFI_UNLIKELY(!rx_entry)) {
587 			FI_WARN(&rxr_prov, FI_LOG_CQ,
588 				"RX entries exhausted.\n");
589 			return -FI_ENOBUFS;
590 		}
591 	} else {
592 		memcpy(rx_entry->iov, msg->msg_iov, sizeof(*rx_entry->iov) * msg->iov_count);
593 		rx_entry->iov_count = msg->iov_count;
594 	}
595 
596 	if (msg->desc)
597 		memcpy(rx_entry->desc, msg->desc, sizeof(void*) * msg->iov_count);
598 
599 	FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
600 	       "Match found in unexp list for a posted recv msg_id: %" PRIu32
601 	       " total_len: %" PRIu64 " tag: %lx\n",
602 	       rx_entry->msg_id, rx_entry->total_len, rx_entry->tag);
603 
604 	ret = rxr_msg_handle_unexp_match(ep, rx_entry, tag, ignore,
605 					 msg->context, msg->addr, op, flags);
606 	return ret;
607 }
608 
rxr_msg_multi_recv_buffer_available(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)609 bool rxr_msg_multi_recv_buffer_available(struct rxr_ep *ep,
610 					 struct rxr_rx_entry *rx_entry)
611 {
612 	assert(rx_entry->fi_flags & FI_MULTI_RECV);
613 	assert(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED);
614 
615 	return (ofi_total_iov_len(rx_entry->iov, rx_entry->iov_count)
616 		>= ep->min_multi_recv_size);
617 }
618 
619 static inline
rxr_msg_multi_recv_buffer_complete(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)620 bool rxr_msg_multi_recv_buffer_complete(struct rxr_ep *ep,
621 					struct rxr_rx_entry *rx_entry)
622 {
623 	assert(rx_entry->fi_flags & FI_MULTI_RECV);
624 	assert(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED);
625 
626 	return (!rxr_msg_multi_recv_buffer_available(ep, rx_entry) &&
627 		dlist_empty(&rx_entry->multi_recv_consumers));
628 }
629 
rxr_msg_multi_recv_free_posted_entry(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)630 void rxr_msg_multi_recv_free_posted_entry(struct rxr_ep *ep,
631 					  struct rxr_rx_entry *rx_entry)
632 {
633 	assert(!(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED));
634 
635 	if ((rx_entry->rxr_flags & RXR_MULTI_RECV_CONSUMER) &&
636 	    rxr_msg_multi_recv_buffer_complete(ep, rx_entry->master_entry))
637 		rxr_release_rx_entry(ep, rx_entry->master_entry);
638 }
639 
640 static
rxr_msg_multi_recv(struct rxr_ep * rxr_ep,const struct fi_msg * msg,uint64_t tag,uint64_t ignore,uint32_t op,uint64_t flags)641 ssize_t rxr_msg_multi_recv(struct rxr_ep *rxr_ep, const struct fi_msg *msg,
642 			   uint64_t tag, uint64_t ignore, uint32_t op, uint64_t flags)
643 {
644 	struct rxr_rx_entry *rx_entry;
645 	int ret = 0;
646 
647 	if ((ofi_total_iov_len(msg->msg_iov, msg->iov_count)
648 	     < rxr_ep->min_multi_recv_size) || op != ofi_op_msg)
649 		return -FI_EINVAL;
650 
651 	/*
652 	 * Always get new rx_entry of type RXR_MULTI_RECV_POSTED when in the
653 	 * multi recv path. The posted entry will not be used for receiving
654 	 * messages but will be used for tracking the application's buffer and
655 	 * when to write the completion to release the buffer.
656 	 */
657 	rx_entry = rxr_ep_get_rx_entry(rxr_ep, msg, tag, ignore, op, flags);
658 	if (OFI_UNLIKELY(!rx_entry)) {
659 		rxr_ep_progress_internal(rxr_ep);
660 		return -FI_EAGAIN;
661 	}
662 
663 	rx_entry->rxr_flags |= RXR_MULTI_RECV_POSTED;
664 	dlist_init(&rx_entry->multi_recv_consumers);
665 	dlist_init(&rx_entry->multi_recv_entry);
666 
667 	while (!dlist_empty(&rxr_ep->rx_unexp_list)) {
668 		ret = rxr_msg_proc_unexp_msg_list(rxr_ep, msg, tag,
669 						  ignore, op, flags, rx_entry);
670 
671 		if (!rxr_msg_multi_recv_buffer_available(rxr_ep, rx_entry)) {
672 			/*
673 			 * Multi recv buffer consumed by short, unexp messages,
674 			 * free posted rx_entry.
675 			 */
676 			if (rxr_msg_multi_recv_buffer_complete(rxr_ep, rx_entry))
677 				rxr_release_rx_entry(rxr_ep, rx_entry);
678 			/*
679 			 * Multi recv buffer has been consumed, but waiting on
680 			 * long msg completion. Last msg completion will free
681 			 * posted rx_entry.
682 			 */
683 			if (ret == -FI_ENOMSG)
684 				return 0;
685 			return ret;
686 		}
687 
688 		if (ret == -FI_ENOMSG) {
689 			ret = 0;
690 			break;
691 		}
692 
693 		/*
694 		 * Error was encountered when processing unexpected messages,
695 		 * but there is buffer space available. Add the posted entry to
696 		 * the rx_list.
697 		 */
698 		if (ret)
699 			break;
700 	}
701 
702 	dlist_insert_tail(&rx_entry->entry, &rxr_ep->rx_list);
703 	return ret;
704 }
705 
rxr_msg_multi_recv_handle_completion(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)706 void rxr_msg_multi_recv_handle_completion(struct rxr_ep *ep,
707 					  struct rxr_rx_entry *rx_entry)
708 {
709 	assert(!(rx_entry->rxr_flags & RXR_MULTI_RECV_POSTED) &&
710 	       (rx_entry->rxr_flags & RXR_MULTI_RECV_CONSUMER));
711 
712 	dlist_remove(&rx_entry->multi_recv_entry);
713 	rx_entry->rxr_flags &= ~RXR_MULTI_RECV_CONSUMER;
714 
715 	if (!rxr_msg_multi_recv_buffer_complete(ep, rx_entry->master_entry))
716 		return;
717 
718 	/*
719 	 * Buffer is consumed and all messages have been received. Update the
720 	 * last message to release the application buffer.
721 	 */
722 	rx_entry->cq_entry.flags |= FI_MULTI_RECV;
723 }
724 
725 /*
726  *     create a rx entry and verify in unexpected message list
727  *     else add to posted recv list
728  */
729 static
rxr_msg_generic_recv(struct fid_ep * ep,const struct fi_msg * msg,uint64_t tag,uint64_t ignore,uint32_t op,uint64_t flags)730 ssize_t rxr_msg_generic_recv(struct fid_ep *ep, const struct fi_msg *msg,
731 			     uint64_t tag, uint64_t ignore, uint32_t op,
732 			     uint64_t flags)
733 {
734 	ssize_t ret = 0;
735 	struct rxr_ep *rxr_ep;
736 	struct dlist_entry *unexp_list;
737 	struct rxr_rx_entry *rx_entry;
738 	uint64_t rx_op_flags;
739 
740 	FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
741 	       "%s: iov_len: %lu tag: %lx ignore: %lx op: %x flags: %lx\n",
742 	       __func__, ofi_total_iov_len(msg->msg_iov, msg->iov_count), tag, ignore,
743 	       op, flags);
744 
745 	rxr_ep = container_of(ep, struct rxr_ep, util_ep.ep_fid.fid);
746 
747 	assert(msg->iov_count <= rxr_ep->rx_iov_limit);
748 
749 	rxr_perfset_start(rxr_ep, perf_rxr_recv);
750 
751 	assert(rxr_ep->util_ep.rx_msg_flags == 0 || rxr_ep->util_ep.rx_msg_flags == FI_COMPLETION);
752 	rx_op_flags = rxr_ep->util_ep.rx_op_flags;
753 	if (rxr_ep->util_ep.rx_msg_flags == 0)
754 		rx_op_flags &= ~FI_COMPLETION;
755 	flags = flags | rx_op_flags;
756 
757 	fastlock_acquire(&rxr_ep->util_ep.lock);
758 	if (OFI_UNLIKELY(is_rx_res_full(rxr_ep))) {
759 		ret = -FI_EAGAIN;
760 		goto out;
761 	}
762 
763 	if (flags & FI_MULTI_RECV) {
764 		ret = rxr_msg_multi_recv(rxr_ep, msg, tag, ignore, op, flags);
765 		goto out;
766 	}
767 
768 	unexp_list = (op == ofi_op_tagged) ? &rxr_ep->rx_unexp_tagged_list :
769 		     &rxr_ep->rx_unexp_list;
770 
771 	if (!dlist_empty(unexp_list)) {
772 		ret = rxr_msg_proc_unexp_msg_list(rxr_ep, msg, tag,
773 						  ignore, op, flags, NULL);
774 
775 		if (ret != -FI_ENOMSG)
776 			goto out;
777 		ret = 0;
778 	}
779 
780 	rx_entry = rxr_ep_get_rx_entry(rxr_ep, msg, tag,
781 				       ignore, op, flags);
782 
783 	if (OFI_UNLIKELY(!rx_entry)) {
784 		ret = -FI_EAGAIN;
785 		rxr_ep_progress_internal(rxr_ep);
786 		goto out;
787 	}
788 
789 	if (op == ofi_op_tagged)
790 		dlist_insert_tail(&rx_entry->entry, &rxr_ep->rx_tagged_list);
791 	else
792 		dlist_insert_tail(&rx_entry->entry, &rxr_ep->rx_list);
793 
794 out:
795 	fastlock_release(&rxr_ep->util_ep.lock);
796 
797 	rxr_perfset_end(rxr_ep, perf_rxr_recv);
798 	return ret;
799 }
800 
801 static
rxr_msg_discard_trecv(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,const struct fi_msg_tagged * msg,int64_t flags)802 ssize_t rxr_msg_discard_trecv(struct rxr_ep *ep,
803 			      struct rxr_rx_entry *rx_entry,
804 			      const struct fi_msg_tagged *msg,
805 			      int64_t flags)
806 {
807 	int ret;
808 
809 	if ((flags & FI_DISCARD) && !(flags & (FI_PEEK | FI_CLAIM)))
810 		return -FI_EINVAL;
811 
812 	rx_entry->fi_flags |= FI_DISCARD;
813 	rx_entry->rxr_flags |= RXR_RECV_CANCEL;
814 	ret = ofi_cq_write(ep->util_ep.rx_cq, msg->context,
815 			   FI_TAGGED | FI_RECV | FI_MSG,
816 			   0, NULL, rx_entry->cq_entry.data,
817 			   rx_entry->cq_entry.tag);
818 	rxr_rm_rx_cq_check(ep, ep->util_ep.rx_cq);
819 	return ret;
820 }
821 
822 static
rxr_msg_claim_trecv(struct fid_ep * ep_fid,const struct fi_msg_tagged * msg,int64_t flags)823 ssize_t rxr_msg_claim_trecv(struct fid_ep *ep_fid,
824 			    const struct fi_msg_tagged *msg,
825 			    int64_t flags)
826 {
827 	ssize_t ret = 0;
828 	struct rxr_ep *ep;
829 	struct rxr_rx_entry *rx_entry;
830 	struct fi_context *context;
831 
832 	ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
833 	fastlock_acquire(&ep->util_ep.lock);
834 
835 	context = (struct fi_context *)msg->context;
836 	rx_entry = (struct rxr_rx_entry *)context->internal[0];
837 
838 	if (flags & FI_DISCARD) {
839 		ret = rxr_msg_discard_trecv(ep, rx_entry, msg, flags);
840 		if (OFI_UNLIKELY(ret))
841 			goto out;
842 	}
843 
844 	/*
845 	 * Handle unexp match entry even for discard entry as we are sinking
846 	 * messages for that case
847 	 */
848 	memcpy(rx_entry->iov, msg->msg_iov,
849 	       sizeof(*msg->msg_iov) * msg->iov_count);
850 	rx_entry->iov_count = msg->iov_count;
851 
852 	ret = rxr_msg_handle_unexp_match(ep, rx_entry, msg->tag,
853 					 msg->ignore, msg->context,
854 					 msg->addr, ofi_op_tagged, flags);
855 
856 out:
857 	fastlock_release(&ep->util_ep.lock);
858 	return ret;
859 }
860 
861 static
rxr_msg_peek_trecv(struct fid_ep * ep_fid,const struct fi_msg_tagged * msg,uint64_t flags)862 ssize_t rxr_msg_peek_trecv(struct fid_ep *ep_fid,
863 			   const struct fi_msg_tagged *msg,
864 			   uint64_t flags)
865 {
866 	ssize_t ret = 0;
867 	struct rxr_ep *ep;
868 	struct dlist_entry *match;
869 	dlist_func_t *match_func;
870 	struct rxr_match_info match_info;
871 	struct rxr_rx_entry *rx_entry;
872 	struct fi_context *context;
873 	struct rxr_pkt_entry *pkt_entry;
874 	size_t data_len;
875 	int64_t tag;
876 
877 	ep = container_of(ep_fid, struct rxr_ep, util_ep.ep_fid.fid);
878 
879 	fastlock_acquire(&ep->util_ep.lock);
880 
881 	rxr_ep_progress_internal(ep);
882 	match_info.addr = msg->addr;
883 	match_info.tag = msg->tag;
884 	match_info.ignore = msg->ignore;
885 
886 	if (ep->util_ep.caps & FI_DIRECTED_RECV)
887 		match_func = &rxr_msg_match_unexp_tagged;
888 	else
889 		match_func = &rxr_msg_match_unexp_tagged_anyaddr;
890 
891 	match = dlist_find_first_match(&ep->rx_unexp_tagged_list,
892 	                               match_func,
893 				       (void *)&match_info);
894 	if (!match) {
895 		FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
896 		       "Message not found addr: %" PRIu64
897 		       " tag: %lx ignore %lx\n", msg->addr, msg->tag,
898 		       msg->ignore);
899 		ret = ofi_cq_write_error_peek(ep->util_ep.rx_cq, msg->tag,
900 					      msg->context);
901 		goto out;
902 	}
903 
904 	rx_entry = container_of(match, struct rxr_rx_entry, entry);
905 	context = (struct fi_context *)msg->context;
906 	if (flags & FI_CLAIM) {
907 		context->internal[0] = rx_entry;
908 		dlist_remove(match);
909 	} else if (flags & FI_DISCARD) {
910 		dlist_remove(match);
911 
912 		ret = rxr_msg_discard_trecv(ep, rx_entry, msg, flags);
913 		if (ret)
914 			goto out;
915 
916 		memcpy(rx_entry->iov, msg->msg_iov,
917 		       sizeof(*msg->msg_iov) * msg->iov_count);
918 		rx_entry->iov_count = msg->iov_count;
919 
920 		ret = rxr_msg_handle_unexp_match(ep, rx_entry,
921 						 msg->tag, msg->ignore,
922 						 msg->context, msg->addr,
923 						 ofi_op_tagged, flags);
924 
925 		goto out;
926 	}
927 
928 	pkt_entry = rx_entry->unexp_pkt;
929 	data_len = rxr_pkt_rtm_total_len(pkt_entry);
930 	tag = rxr_pkt_rtm_tag(pkt_entry);
931 
932 	if (ep->util_ep.caps & FI_SOURCE)
933 		ret = ofi_cq_write_src(ep->util_ep.rx_cq, context,
934 				       FI_TAGGED | FI_RECV,
935 				       data_len, NULL,
936 				       rx_entry->cq_entry.data, tag,
937 				       rx_entry->addr);
938 	else
939 		ret = ofi_cq_write(ep->util_ep.rx_cq, context,
940 				   FI_TAGGED | FI_RECV,
941 				   data_len, NULL,
942 				   rx_entry->cq_entry.data, tag);
943 	rxr_rm_rx_cq_check(ep, ep->util_ep.rx_cq);
944 out:
945 	fastlock_release(&ep->util_ep.lock);
946 	return ret;
947 }
948 
949 /**
950  *   Non-tagged receive ops
951  */
952 static
rxr_msg_recvmsg(struct fid_ep * ep_fid,const struct fi_msg * msg,uint64_t flags)953 ssize_t rxr_msg_recvmsg(struct fid_ep *ep_fid, const struct fi_msg *msg,
954 			uint64_t flags)
955 {
956 	return rxr_msg_generic_recv(ep_fid, msg, 0, 0, ofi_op_msg, flags);
957 }
958 
959 static
rxr_msg_recv(struct fid_ep * ep,void * buf,size_t len,void * desc,fi_addr_t src_addr,void * context)960 ssize_t rxr_msg_recv(struct fid_ep *ep, void *buf, size_t len,
961 		     void *desc, fi_addr_t src_addr, void *context)
962 {
963 	struct fi_msg msg;
964 	struct iovec msg_iov;
965 
966 	memset(&msg, 0, sizeof(msg));
967 	msg_iov.iov_base = buf;
968 	msg_iov.iov_len = len;
969 
970 	msg.msg_iov = &msg_iov;
971 	msg.desc = &desc;
972 	msg.iov_count = 1;
973 	msg.addr = src_addr;
974 	msg.context = context;
975 	msg.data = 0;
976 
977 	return rxr_msg_recvmsg(ep, &msg, 0);
978 }
979 
980 static
rxr_msg_recvv(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t src_addr,void * context)981 ssize_t rxr_msg_recvv(struct fid_ep *ep, const struct iovec *iov,
982 		      void **desc, size_t count, fi_addr_t src_addr,
983 		      void *context)
984 {
985 	struct fi_msg msg;
986 
987 	memset(&msg, 0, sizeof(msg));
988 	msg.msg_iov = iov;
989 	msg.desc = desc;
990 	msg.iov_count = count;
991 	msg.addr = src_addr;
992 	msg.context = context;
993 	msg.data = 0;
994 
995 	return rxr_msg_recvmsg(ep, &msg, 0);
996 }
997 
998 /**
999  *   Tagged receive ops functions
1000  */
1001 static
rxr_msg_trecv(struct fid_ep * ep_fid,void * buf,size_t len,void * desc,fi_addr_t src_addr,uint64_t tag,uint64_t ignore,void * context)1002 ssize_t rxr_msg_trecv(struct fid_ep *ep_fid, void *buf, size_t len, void *desc,
1003 		      fi_addr_t src_addr, uint64_t tag, uint64_t ignore,
1004 		      void *context)
1005 {
1006 	struct fi_msg msg;
1007 	struct iovec msg_iov;
1008 
1009 	msg_iov.iov_base = (void *)buf;
1010 	msg_iov.iov_len = len;
1011 
1012 	msg.msg_iov = &msg_iov;
1013 	msg.iov_count = 1;
1014 	msg.addr = src_addr;
1015 	msg.context = context;
1016 	msg.desc = &desc;
1017 
1018 	return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, 0);
1019 }
1020 
1021 static
rxr_msg_trecvv(struct fid_ep * ep_fid,const struct iovec * iov,void ** desc,size_t count,fi_addr_t src_addr,uint64_t tag,uint64_t ignore,void * context)1022 ssize_t rxr_msg_trecvv(struct fid_ep *ep_fid, const struct iovec *iov,
1023 		       void **desc, size_t count, fi_addr_t src_addr,
1024 		       uint64_t tag, uint64_t ignore, void *context)
1025 {
1026 	struct fi_msg msg;
1027 
1028 	msg.msg_iov = iov;
1029 	msg.iov_count = count;
1030 	msg.addr = src_addr;
1031 	msg.desc = desc;
1032 	msg.context = context;
1033 
1034 	return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, 0);
1035 }
1036 
1037 static
rxr_msg_trecvmsg(struct fid_ep * ep_fid,const struct fi_msg_tagged * tagmsg,uint64_t flags)1038 ssize_t rxr_msg_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *tagmsg,
1039 			 uint64_t flags)
1040 {
1041 	ssize_t ret;
1042 	struct fi_msg msg;
1043 
1044 	if (flags & FI_PEEK) {
1045 		ret = rxr_msg_peek_trecv(ep_fid, tagmsg, flags);
1046 		goto out;
1047 	} else if (flags & FI_CLAIM) {
1048 		ret = rxr_msg_claim_trecv(ep_fid, tagmsg, flags);
1049 		goto out;
1050 	}
1051 
1052 	msg.msg_iov = tagmsg->msg_iov;
1053 	msg.iov_count = tagmsg->iov_count;
1054 	msg.addr = tagmsg->addr;
1055 	msg.desc = tagmsg->desc;
1056 	msg.context = tagmsg->context;
1057 
1058 	ret = rxr_msg_generic_recv(ep_fid, &msg, tagmsg->tag, tagmsg->ignore,
1059 				   ofi_op_tagged, flags);
1060 
1061 out:
1062 	return ret;
1063 }
1064 
1065 /**
1066  * Ops structures used by rxr_endpoint()
1067  */
1068 struct fi_ops_msg rxr_ops_msg = {
1069 	.size = sizeof(struct fi_ops_msg),
1070 	.send = rxr_msg_send,
1071 	.sendv = rxr_msg_sendv,
1072 	.sendmsg = rxr_msg_sendmsg,
1073 	.senddata = rxr_msg_senddata,
1074 	.inject = rxr_msg_inject,
1075 	.injectdata = rxr_msg_injectdata,
1076 	.recv = rxr_msg_recv,
1077 	.recvv = rxr_msg_recvv,
1078 	.recvmsg = rxr_msg_recvmsg,
1079 };
1080 
1081 struct fi_ops_tagged rxr_ops_tagged = {
1082 	.size = sizeof(struct fi_ops_tagged),
1083 	.send = rxr_msg_tsend,
1084 	.sendv = rxr_msg_tsendv,
1085 	.sendmsg = rxr_msg_tsendmsg,
1086 	.senddata = rxr_msg_tsenddata,
1087 	.inject = rxr_msg_tinject,
1088 	.injectdata = rxr_msg_tinjectdata,
1089 	.recv = rxr_msg_trecv,
1090 	.recvv = rxr_msg_trecvv,
1091 	.recvmsg = rxr_msg_trecvmsg,
1092 };
1093 
1094