1 /*
2  * Copyright (c) 2019 Amazon.com, Inc. or its affiliates.
3  * All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include <stdlib.h>
35 #include <string.h>
36 #include <inttypes.h>
37 #include <ofi_iov.h>
38 #include <ofi_recvwin.h>
39 #include "rxr.h"
40 #include "rxr_rma.h"
41 #include "rxr_msg.h"
42 #include "rxr_cntr.h"
43 #include "rxr_atomic.h"
44 #include "efa.h"
45 
rxr_cq_strerror(struct fid_cq * cq_fid,int prov_errno,const void * err_data,char * buf,size_t len)46 static const char *rxr_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
47 				   const void *err_data, char *buf, size_t len)
48 {
49 	struct fid_list_entry *fid_entry;
50 	struct util_ep *util_ep;
51 	struct util_cq *cq;
52 	struct rxr_ep *ep;
53 	const char *str;
54 
55 	cq = container_of(cq_fid, struct util_cq, cq_fid);
56 
57 	fastlock_acquire(&cq->ep_list_lock);
58 	assert(!dlist_empty(&cq->ep_list));
59 	fid_entry = container_of(cq->ep_list.next,
60 				 struct fid_list_entry, entry);
61 	util_ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
62 	ep = container_of(util_ep, struct rxr_ep, util_ep);
63 
64 	str = fi_cq_strerror(ep->rdm_cq, prov_errno, err_data, buf, len);
65 	fastlock_release(&cq->ep_list_lock);
66 	return str;
67 }
68 
69 /*
70  * Teardown rx_entry and write an error cq entry. With our current protocol we
71  * will only encounter an RX error when sending a queued REQ or CTS packet or
72  * if we are sending a CTS message. Because of this, the sender will not send
73  * any additional data packets if the receiver encounters an error. If there is
74  * a scenario in the future where the sender will continue to send data packets
75  * we need to prevent rx_id mismatch. Ideally, we should add a NACK message and
76  * tear down both RX and TX entires although whatever caused the error may
77  * prevent that.
78  *
79  * TODO: add a NACK message to tear down state on sender side
80  */
rxr_cq_handle_rx_error(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,ssize_t prov_errno)81 int rxr_cq_handle_rx_error(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry,
82 			   ssize_t prov_errno)
83 {
84 	struct fi_cq_err_entry err_entry;
85 	struct util_cq *util_cq;
86 	struct dlist_entry *tmp;
87 	struct rxr_pkt_entry *pkt_entry;
88 
89 	memset(&err_entry, 0, sizeof(err_entry));
90 
91 	util_cq = ep->util_ep.rx_cq;
92 
93 	err_entry.err = FI_EIO;
94 	err_entry.prov_errno = (int)prov_errno;
95 
96 	switch (rx_entry->state) {
97 	case RXR_RX_INIT:
98 	case RXR_RX_UNEXP:
99 		dlist_remove(&rx_entry->entry);
100 		break;
101 	case RXR_RX_MATCHED:
102 		break;
103 	case RXR_RX_RECV:
104 #if ENABLE_DEBUG
105 		dlist_remove(&rx_entry->rx_pending_entry);
106 #endif
107 		break;
108 	case RXR_RX_QUEUED_CTRL:
109 	case RXR_RX_QUEUED_CTS_RNR:
110 	case RXR_RX_QUEUED_EOR:
111 		dlist_remove(&rx_entry->queued_entry);
112 		break;
113 	default:
114 		FI_WARN(&rxr_prov, FI_LOG_CQ, "rx_entry unknown state %d\n",
115 			rx_entry->state);
116 		assert(0 && "rx_entry unknown state");
117 	}
118 
119 	dlist_foreach_container_safe(&rx_entry->queued_pkts,
120 				     struct rxr_pkt_entry,
121 				     pkt_entry, entry, tmp)
122 		rxr_pkt_entry_release_tx(ep, pkt_entry);
123 
124 	if (rx_entry->unexp_pkt) {
125 		rxr_pkt_entry_release_rx(ep, rx_entry->unexp_pkt);
126 		rx_entry->unexp_pkt = NULL;
127 	}
128 
129 	if (rx_entry->fi_flags & FI_MULTI_RECV)
130 		rxr_msg_multi_recv_handle_completion(ep, rx_entry);
131 
132 	err_entry.flags = rx_entry->cq_entry.flags;
133 	if (rx_entry->state != RXR_RX_UNEXP)
134 		err_entry.op_context = rx_entry->cq_entry.op_context;
135 	err_entry.buf = rx_entry->cq_entry.buf;
136 	err_entry.data = rx_entry->cq_entry.data;
137 	err_entry.tag = rx_entry->cq_entry.tag;
138 
139 	rxr_msg_multi_recv_free_posted_entry(ep, rx_entry);
140 
141         FI_WARN(&rxr_prov, FI_LOG_CQ,
142 		"rxr_cq_handle_rx_error: err: %d, prov_err: %s (%d)\n",
143 		err_entry.err, fi_strerror(-err_entry.prov_errno),
144 		err_entry.prov_errno);
145 
146 	/*
147 	 * TODO: We can't free the rx_entry as we may receive additional
148 	 * packets for this entry. Add ref counting so the rx_entry can safely
149 	 * be freed once all packets are accounted for.
150 	 */
151 	//rxr_release_rx_entry(ep, rx_entry);
152 
153 	efa_cntr_report_error(&ep->util_ep, err_entry.flags);
154 	return ofi_cq_write_error(util_cq, &err_entry);
155 }
156 
157 /*
158  * Teardown tx_entry and write an error cq entry. With our current protocol the
159  * receiver will only send a CTS once the window is exhausted, meaning that all
160  * data packets for that window will have been received successfully. This
161  * means that the receiver will not send any CTS packets if the sender
162  * encounters and error sending data packets. If that changes in the future we
163  * will need to be careful to prevent tx_id mismatch.
164  *
165  * TODO: add NACK message to tear down receive side state
166  */
rxr_cq_handle_tx_error(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry,ssize_t prov_errno)167 int rxr_cq_handle_tx_error(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry,
168 			   ssize_t prov_errno)
169 {
170 	struct fi_cq_err_entry err_entry;
171 	struct util_cq *util_cq;
172 	uint32_t api_version;
173 	struct dlist_entry *tmp;
174 	struct rxr_pkt_entry *pkt_entry;
175 
176 	memset(&err_entry, 0, sizeof(err_entry));
177 
178 	util_cq = ep->util_ep.tx_cq;
179 	api_version = util_cq->domain->fabric->fabric_fid.api_version;
180 
181 	err_entry.err = FI_EIO;
182 	err_entry.prov_errno = (int)prov_errno;
183 
184 	switch (tx_entry->state) {
185 	case RXR_TX_REQ:
186 		break;
187 	case RXR_TX_SEND:
188 		dlist_remove(&tx_entry->entry);
189 		break;
190 	case RXR_TX_QUEUED_CTRL:
191 	case RXR_TX_QUEUED_SHM_RMA:
192 	case RXR_TX_QUEUED_REQ_RNR:
193 	case RXR_TX_QUEUED_DATA_RNR:
194 		dlist_remove(&tx_entry->queued_entry);
195 		break;
196 	case RXR_TX_SENT_READRSP:
197 	case RXR_TX_WAIT_READ_FINISH:
198 		break;
199 	default:
200 		FI_WARN(&rxr_prov, FI_LOG_CQ, "tx_entry unknown state %d\n",
201 			tx_entry->state);
202 		assert(0 && "tx_entry unknown state");
203 	}
204 
205 	dlist_foreach_container_safe(&tx_entry->queued_pkts,
206 				     struct rxr_pkt_entry,
207 				     pkt_entry, entry, tmp)
208 		rxr_pkt_entry_release_tx(ep, pkt_entry);
209 
210 	err_entry.flags = tx_entry->cq_entry.flags;
211 	err_entry.op_context = tx_entry->cq_entry.op_context;
212 	err_entry.buf = tx_entry->cq_entry.buf;
213 	err_entry.data = tx_entry->cq_entry.data;
214 	err_entry.tag = tx_entry->cq_entry.tag;
215 	if (FI_VERSION_GE(api_version, FI_VERSION(1, 5)))
216 		err_entry.err_data_size = 0;
217 
218 	FI_WARN(&rxr_prov, FI_LOG_CQ,
219 		"rxr_cq_handle_tx_error: err: %d, prov_err: %s (%d)\n",
220 		err_entry.err, fi_strerror(-err_entry.prov_errno),
221 		err_entry.prov_errno);
222 
223 	/*
224 	 * TODO: We can't free the tx_entry as we may receive a control packet
225 	 * packet for this entry. Add ref counting so the tx_entry can safely
226 	 * be freed once all packets are accounted for.
227 	 */
228 	//rxr_release_tx_entry(ep, tx_entry);
229 
230 	efa_cntr_report_error(&ep->util_ep, tx_entry->cq_entry.flags);
231 	return ofi_cq_write_error(util_cq, &err_entry);
232 }
233 
234 /*
235  * Queue a packet on the appropriate list when an RNR error is received.
236  */
rxr_cq_queue_pkt(struct rxr_ep * ep,struct dlist_entry * list,struct rxr_pkt_entry * pkt_entry)237 static inline void rxr_cq_queue_pkt(struct rxr_ep *ep,
238 				    struct dlist_entry *list,
239 				    struct rxr_pkt_entry *pkt_entry)
240 {
241 	struct rxr_peer *peer;
242 
243 	peer = rxr_ep_get_peer(ep, pkt_entry->addr);
244 
245 	/*
246 	 * Queue the packet if it has not been retransmitted yet.
247 	 */
248 	if (pkt_entry->state != RXR_PKT_ENTRY_RNR_RETRANSMIT) {
249 		pkt_entry->state = RXR_PKT_ENTRY_RNR_RETRANSMIT;
250 		peer->rnr_queued_pkt_cnt++;
251 		goto queue_pkt;
252 	}
253 
254 	/*
255 	 * Otherwise, increase the backoff if the peer is already not in
256 	 * backoff. Reset the timer when starting backoff or if another RNR for
257 	 * a retransmitted packet is received while waiting for the timer to
258 	 * expire.
259 	 */
260 	peer->rnr_ts = ofi_gettime_us();
261 	if (peer->flags & RXR_PEER_IN_BACKOFF)
262 		goto queue_pkt;
263 
264 	peer->flags |= RXR_PEER_IN_BACKOFF;
265 
266 	if (!peer->timeout_interval) {
267 		if (rxr_env.timeout_interval)
268 			peer->timeout_interval = rxr_env.timeout_interval;
269 		else
270 			peer->timeout_interval = MAX(RXR_RAND_MIN_TIMEOUT,
271 						     rand() %
272 						     RXR_RAND_MAX_TIMEOUT);
273 
274 		peer->rnr_timeout_exp = 1;
275 		FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
276 		       "initializing backoff timeout for peer: %" PRIu64
277 		       " timeout: %d rnr_queued_pkts: %d\n",
278 		       pkt_entry->addr, peer->timeout_interval,
279 		       peer->rnr_queued_pkt_cnt);
280 	} else {
281 		/* Only backoff once per peer per progress thread loop. */
282 		if (!(peer->flags & RXR_PEER_BACKED_OFF)) {
283 			peer->flags |= RXR_PEER_BACKED_OFF;
284 			peer->rnr_timeout_exp++;
285 			FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
286 			       "increasing backoff for peer: %" PRIu64
287 			       " rnr_timeout_exp: %d rnr_queued_pkts: %d\n",
288 			       pkt_entry->addr, peer->rnr_timeout_exp,
289 			       peer->rnr_queued_pkt_cnt);
290 		}
291 	}
292 	dlist_insert_tail(&peer->rnr_entry,
293 			  &ep->peer_backoff_list);
294 
295 queue_pkt:
296 #if ENABLE_DEBUG
297 	dlist_remove(&pkt_entry->dbg_entry);
298 #endif
299 	dlist_insert_tail(&pkt_entry->entry, list);
300 }
301 
rxr_cq_handle_cq_error(struct rxr_ep * ep,ssize_t err)302 int rxr_cq_handle_cq_error(struct rxr_ep *ep, ssize_t err)
303 {
304 	struct fi_cq_err_entry err_entry;
305 	struct rxr_pkt_entry *pkt_entry;
306 	struct rxr_rx_entry *rx_entry;
307 	struct rxr_tx_entry *tx_entry;
308 	struct rxr_peer *peer;
309 	ssize_t ret;
310 
311 	memset(&err_entry, 0, sizeof(err_entry));
312 
313 	/*
314 	 * If the cq_read failed with another error besides -FI_EAVAIL or
315 	 * the cq_readerr fails we don't know if this is an rx or tx error.
316 	 * We'll write an error eq entry to the event queue instead.
317 	 */
318 
319 	err_entry.err = FI_EIO;
320 	err_entry.prov_errno = (int)err;
321 
322 	if (err != -FI_EAVAIL) {
323 		FI_WARN(&rxr_prov, FI_LOG_CQ, "fi_cq_read: %s\n",
324 			fi_strerror(-err));
325 		goto write_err;
326 	}
327 
328 	ret = fi_cq_readerr(ep->rdm_cq, &err_entry, 0);
329 	if (ret != 1) {
330 		if (ret < 0) {
331 			FI_WARN(&rxr_prov, FI_LOG_CQ, "fi_cq_readerr: %s\n",
332 				fi_strerror(-ret));
333 			err_entry.prov_errno = ret;
334 		} else {
335 			FI_WARN(&rxr_prov, FI_LOG_CQ,
336 				"fi_cq_readerr unexpected size %zu expected %zu\n",
337 				ret, sizeof(err_entry));
338 			err_entry.prov_errno = -FI_EIO;
339 		}
340 		goto write_err;
341 	}
342 
343 	if (err_entry.err != -FI_EAGAIN)
344 		OFI_CQ_STRERROR(&rxr_prov, FI_LOG_WARN, FI_LOG_CQ, ep->rdm_cq,
345 				&err_entry);
346 
347 	pkt_entry = (struct rxr_pkt_entry *)err_entry.op_context;
348 	peer = rxr_ep_get_peer(ep, pkt_entry->addr);
349 
350 	/*
351 	 * A handshake send could fail at the core provider if the peer endpoint
352 	 * is shutdown soon after it receives a send completion for the REQ
353 	 * packet that included src_address. The handshake itself is irrelevant if
354 	 * that happens, so just squelch this error entry and move on without
355 	 * writing an error completion or event to the application.
356 	 */
357 	if (rxr_get_base_hdr(pkt_entry->pkt)->type == RXR_HANDSHAKE_PKT) {
358 		FI_WARN(&rxr_prov, FI_LOG_CQ,
359 			"Squelching error CQE for RXR_HANDSHAKE_PKT\n");
360 		/*
361 		 * HANDSHAKE packets do not have an associated rx/tx entry. Use
362 		 * the flags instead to determine if this is a send or recv.
363 		 */
364 		if (err_entry.flags & FI_SEND) {
365 			rxr_ep_dec_tx_pending(ep, peer, 1);
366 			rxr_pkt_entry_release_tx(ep, pkt_entry);
367 		} else if (err_entry.flags & FI_RECV) {
368 			rxr_pkt_entry_release_rx(ep, pkt_entry);
369 		} else {
370 			assert(0 && "unknown err_entry flags in HANDSHAKE packet");
371 		}
372 		return 0;
373 	}
374 
375 	if (!pkt_entry->x_entry) {
376 		/*
377 		 * A NULL x_entry means this is a recv posted buf pkt_entry.
378 		 * Since we don't have any context besides the error code,
379 		 * we will write to the eq instead.
380 		 */
381 		rxr_pkt_entry_release_rx(ep, pkt_entry);
382 		goto write_err;
383 	}
384 
385 	/*
386 	 * If x_entry is set this rx or tx entry error is for a sent
387 	 * packet. Decrement the tx_pending counter and fall through to
388 	 * the rx or tx entry handlers.
389 	 */
390 	if (!peer->is_local)
391 		rxr_ep_dec_tx_pending(ep, peer, 1);
392 	if (RXR_GET_X_ENTRY_TYPE(pkt_entry) == RXR_TX_ENTRY) {
393 		tx_entry = (struct rxr_tx_entry *)pkt_entry->x_entry;
394 		if (err_entry.err != -FI_EAGAIN ||
395 		    rxr_ep_domain(ep)->resource_mgmt == FI_RM_ENABLED) {
396 			ret = rxr_cq_handle_tx_error(ep, tx_entry,
397 						     err_entry.prov_errno);
398 			rxr_pkt_entry_release_tx(ep, pkt_entry);
399 			return ret;
400 		}
401 
402 		rxr_cq_queue_pkt(ep, &tx_entry->queued_pkts, pkt_entry);
403 		if (tx_entry->state == RXR_TX_SEND) {
404 			dlist_remove(&tx_entry->entry);
405 			tx_entry->state = RXR_TX_QUEUED_DATA_RNR;
406 			dlist_insert_tail(&tx_entry->queued_entry,
407 					  &ep->tx_entry_queued_list);
408 		} else if (tx_entry->state == RXR_TX_REQ) {
409 			tx_entry->state = RXR_TX_QUEUED_REQ_RNR;
410 			dlist_insert_tail(&tx_entry->queued_entry,
411 					  &ep->tx_entry_queued_list);
412 		}
413 		return 0;
414 	} else if (RXR_GET_X_ENTRY_TYPE(pkt_entry) == RXR_RX_ENTRY) {
415 		rx_entry = (struct rxr_rx_entry *)pkt_entry->x_entry;
416 		if (err_entry.err != -FI_EAGAIN ||
417 		    rxr_ep_domain(ep)->resource_mgmt == FI_RM_ENABLED) {
418 			ret = rxr_cq_handle_rx_error(ep, rx_entry,
419 						     err_entry.prov_errno);
420 			rxr_pkt_entry_release_tx(ep, pkt_entry);
421 			return ret;
422 		}
423 		rxr_cq_queue_pkt(ep, &rx_entry->queued_pkts, pkt_entry);
424 		if (rx_entry->state == RXR_RX_RECV) {
425 			rx_entry->state = RXR_RX_QUEUED_CTS_RNR;
426 			dlist_insert_tail(&rx_entry->queued_entry,
427 					  &ep->rx_entry_queued_list);
428 		}
429 		return 0;
430 	}
431 
432 	FI_WARN(&rxr_prov, FI_LOG_CQ,
433 		"%s unknown x_entry state %d\n",
434 		__func__, RXR_GET_X_ENTRY_TYPE(pkt_entry));
435 	assert(0 && "unknown x_entry state");
436 write_err:
437 	efa_eq_write_error(&ep->util_ep, err_entry.err, err_entry.prov_errno);
438 	return 0;
439 }
440 
rxr_cq_write_rx_completion(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)441 void rxr_cq_write_rx_completion(struct rxr_ep *ep,
442 				struct rxr_rx_entry *rx_entry)
443 {
444 	struct util_cq *rx_cq = ep->util_ep.rx_cq;
445 	int ret = 0;
446 	if (OFI_UNLIKELY(rx_entry->cq_entry.len < rx_entry->total_len)) {
447 		FI_WARN(&rxr_prov, FI_LOG_CQ,
448 			"Message truncated: tag: %"PRIu64" len: %"PRIu64" total_len: %zu\n",
449 			rx_entry->cq_entry.tag,	rx_entry->total_len,
450 			rx_entry->cq_entry.len);
451 
452 		ret = ofi_cq_write_error_trunc(ep->util_ep.rx_cq,
453 					       rx_entry->cq_entry.op_context,
454 					       rx_entry->cq_entry.flags,
455 					       rx_entry->total_len,
456 					       rx_entry->cq_entry.buf,
457 					       rx_entry->cq_entry.data,
458 					       rx_entry->cq_entry.tag,
459 					       rx_entry->total_len -
460 					       rx_entry->cq_entry.len);
461 
462 		rxr_rm_rx_cq_check(ep, rx_cq);
463 
464 		if (OFI_UNLIKELY(ret))
465 			FI_WARN(&rxr_prov, FI_LOG_CQ,
466 				"Unable to write recv error cq: %s\n",
467 				fi_strerror(-ret));
468 
469 		efa_cntr_report_error(&ep->util_ep, rx_entry->cq_entry.flags);
470 		return;
471 	}
472 
473 	if (!(rx_entry->rxr_flags & RXR_RECV_CANCEL) &&
474 	    (ofi_need_completion(rxr_rx_flags(ep), rx_entry->fi_flags) ||
475 	     (rx_entry->cq_entry.flags & FI_MULTI_RECV))) {
476 		FI_DBG(&rxr_prov, FI_LOG_CQ,
477 		       "Writing recv completion for rx_entry from peer: %"
478 		       PRIu64 " rx_id: %" PRIu32 " msg_id: %" PRIu32
479 		       " tag: %lx total_len: %" PRIu64 "\n",
480 		       rx_entry->addr, rx_entry->rx_id, rx_entry->msg_id,
481 		       rx_entry->cq_entry.tag, rx_entry->total_len);
482 
483 		if (ep->util_ep.caps & FI_SOURCE)
484 			ret = ofi_cq_write_src(rx_cq,
485 					       rx_entry->cq_entry.op_context,
486 					       rx_entry->cq_entry.flags,
487 					       rx_entry->cq_entry.len,
488 					       rx_entry->cq_entry.buf,
489 					       rx_entry->cq_entry.data,
490 					       rx_entry->cq_entry.tag,
491 					       rx_entry->addr);
492 		else
493 			ret = ofi_cq_write(rx_cq,
494 					   rx_entry->cq_entry.op_context,
495 					   rx_entry->cq_entry.flags,
496 					   rx_entry->cq_entry.len,
497 					   rx_entry->cq_entry.buf,
498 					   rx_entry->cq_entry.data,
499 					   rx_entry->cq_entry.tag);
500 
501 		rxr_rm_rx_cq_check(ep, rx_cq);
502 
503 		if (OFI_UNLIKELY(ret)) {
504 			FI_WARN(&rxr_prov, FI_LOG_CQ,
505 				"Unable to write recv completion: %s\n",
506 				fi_strerror(-ret));
507 			if (rxr_cq_handle_rx_error(ep, rx_entry, ret))
508 				assert(0 && "failed to write err cq entry");
509 			return;
510 		}
511 	}
512 
513 	efa_cntr_report_rx_completion(&ep->util_ep, rx_entry->cq_entry.flags);
514 }
515 
rxr_cq_handle_rx_completion(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,struct rxr_rx_entry * rx_entry)516 void rxr_cq_handle_rx_completion(struct rxr_ep *ep,
517 				 struct rxr_pkt_entry *pkt_entry,
518 				 struct rxr_rx_entry *rx_entry)
519 {
520 	struct rxr_tx_entry *tx_entry = NULL;
521 
522 	if (rx_entry->cq_entry.flags & FI_WRITE) {
523 		/*
524 		 * must be on the remote side, notify cq/counter
525 		 * if FI_RMA_EVENT is requested or REMOTE_CQ_DATA is on
526 		 */
527 		if (rx_entry->cq_entry.flags & FI_REMOTE_CQ_DATA)
528 			rxr_cq_write_rx_completion(ep, rx_entry);
529 		else if (ep->util_ep.caps & FI_RMA_EVENT)
530 			efa_cntr_report_rx_completion(&ep->util_ep, rx_entry->cq_entry.flags);
531 
532 		rxr_pkt_entry_release_rx(ep, pkt_entry);
533 		return;
534 	}
535 
536 	if (rx_entry->cq_entry.flags & FI_READ) {
537 		/* Note for emulated FI_READ, there is an rx_entry on
538 		 * both initiator side and on remote side.
539 		 * However, only on the initiator side,
540 		 * rxr_cq_handle_rx_completion() will be called.
541 		 * The following shows the sequence of events that
542 		 * is happening
543 		 *
544 		 * Initiator side                 Remote side
545 		 * create tx_entry
546 		 * create rx_entry
547 		 * send rtr(with rx_id)
548 		 *                                receive rtr
549 		 *                                create rx_entry
550 		 *                                create tx_entry
551 		 *                                tx_entry sending data
552 		 * rx_entry receiving data
553 		 * receive completed              send completed
554 		 * handle_rx_completion()         handle_pkt_send_completion()
555 		 * |->write_tx_completion()       |-> if (FI_RMA_EVENT)
556 		 *                                         write_rx_completion()
557 		 *
558 		 * As can be seen, although there is a rx_entry on remote side,
559 		 * the entry will not enter into rxr_cq_handle_rx_completion
560 		 * So at this point we must be on the initiator side, we
561 		 *     1. find the corresponding tx_entry
562 		 *     2. call rxr_cq_write_tx_completion()
563 		 */
564 		tx_entry = ofi_bufpool_get_ibuf(ep->tx_entry_pool, rx_entry->rma_loc_tx_id);
565 		assert(tx_entry->state == RXR_TX_WAIT_READ_FINISH);
566 		if (tx_entry->fi_flags & FI_COMPLETION) {
567 			/* Note write_tx_completion() will release tx_entry */
568 			rxr_cq_write_tx_completion(ep, tx_entry);
569 		} else {
570 			efa_cntr_report_tx_completion(&ep->util_ep, tx_entry->cq_entry.flags);
571 			rxr_release_tx_entry(ep, tx_entry);
572 		}
573 
574 		/*
575 		 * do not call rxr_release_rx_entry here because
576 		 * caller will release
577 		 */
578 		rxr_pkt_entry_release_rx(ep, pkt_entry);
579 		return;
580 	}
581 
582 	if (rx_entry->fi_flags & FI_MULTI_RECV)
583 		rxr_msg_multi_recv_handle_completion(ep, rx_entry);
584 
585 	rxr_cq_write_rx_completion(ep, rx_entry);
586 	rxr_pkt_entry_release_rx(ep, pkt_entry);
587 	return;
588 }
589 
rxr_cq_reorder_msg(struct rxr_ep * ep,struct rxr_peer * peer,struct rxr_pkt_entry * pkt_entry)590 int rxr_cq_reorder_msg(struct rxr_ep *ep,
591 		       struct rxr_peer *peer,
592 		       struct rxr_pkt_entry *pkt_entry)
593 {
594 	struct rxr_pkt_entry *ooo_entry;
595 	struct rxr_pkt_entry *cur_ooo_entry;
596 	uint32_t msg_id;
597 
598 	assert(rxr_get_base_hdr(pkt_entry->pkt)->type >= RXR_REQ_PKT_BEGIN);
599 
600 	msg_id = rxr_pkt_msg_id(pkt_entry);
601 	/*
602 	 * TODO: Initialize peer state  at the time of AV insertion
603 	 * where duplicate detection is available.
604 	 */
605 	if (!peer->rx_init)
606 		rxr_ep_peer_init_rx(ep, peer);
607 
608 #if ENABLE_DEBUG
609 	if (msg_id != ofi_recvwin_next_exp_id(peer->robuf))
610 		FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
611 		       "msg OOO msg_id: %" PRIu32 " expected: %"
612 		       PRIu32 "\n", msg_id,
613 		       ofi_recvwin_next_exp_id(peer->robuf));
614 #endif
615 	if (ofi_recvwin_is_exp(peer->robuf, msg_id))
616 		return 0;
617 	else if (!ofi_recvwin_id_valid(peer->robuf, msg_id))
618 		return -FI_EALREADY;
619 
620 	if (OFI_LIKELY(rxr_env.rx_copy_ooo)) {
621 		assert(pkt_entry->type == RXR_PKT_ENTRY_POSTED);
622 		ooo_entry = rxr_pkt_entry_clone(ep, ep->rx_ooo_pkt_pool, pkt_entry, RXR_PKT_ENTRY_OOO);
623 		if (OFI_UNLIKELY(!ooo_entry)) {
624 			FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
625 				"Unable to allocate rx_pkt_entry for OOO msg\n");
626 			return -FI_ENOMEM;
627 		}
628 		rxr_pkt_entry_release_rx(ep, pkt_entry);
629 	} else {
630 		ooo_entry = pkt_entry;
631 	}
632 
633 	cur_ooo_entry = *ofi_recvwin_get_msg(peer->robuf, msg_id);
634 	if (cur_ooo_entry) {
635 		assert(rxr_get_base_hdr(cur_ooo_entry->pkt)->type == RXR_MEDIUM_MSGRTM_PKT ||
636 		       rxr_get_base_hdr(cur_ooo_entry->pkt)->type == RXR_MEDIUM_TAGRTM_PKT);
637 		assert(rxr_pkt_msg_id(cur_ooo_entry) == msg_id);
638 		assert(rxr_pkt_rtm_total_len(cur_ooo_entry) == rxr_pkt_rtm_total_len(ooo_entry));
639 		rxr_pkt_entry_append(cur_ooo_entry, ooo_entry);
640 	} else {
641 		ofi_recvwin_queue_msg(peer->robuf, &ooo_entry, msg_id);
642 	}
643 
644 	return 1;
645 }
646 
rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep * ep,struct rxr_peer * peer)647 void rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep *ep,
648 					  struct rxr_peer *peer)
649 {
650 	struct rxr_pkt_entry *pending_pkt;
651 	int ret = 0;
652 	uint32_t msg_id;
653 
654 	while (1) {
655 		pending_pkt = *ofi_recvwin_peek(peer->robuf);
656 		if (!pending_pkt || !pending_pkt->pkt)
657 			return;
658 
659 		msg_id = rxr_pkt_msg_id(pending_pkt);
660 		FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
661 		       "Processing msg_id %d from robuf\n", msg_id);
662 		/* rxr_pkt_proc_rtm_rta will write error cq entry if needed */
663 		ret = rxr_pkt_proc_rtm_rta(ep, pending_pkt);
664 		*ofi_recvwin_get_next_msg(peer->robuf) = NULL;
665 		if (OFI_UNLIKELY(ret)) {
666 			FI_WARN(&rxr_prov, FI_LOG_CQ,
667 				"Error processing msg_id %d from robuf: %s\n",
668 				msg_id, fi_strerror(-ret));
669 			return;
670 		}
671 	}
672 	return;
673 }
674 
675 /* Handle two scenarios:
676  *  1. RMA writes with immediate data at remote endpoint,
677  *  2. atomic completion on the requester
678  * write completion for both
679  */
rxr_cq_handle_shm_completion(struct rxr_ep * ep,struct fi_cq_data_entry * cq_entry,fi_addr_t src_addr)680 void rxr_cq_handle_shm_completion(struct rxr_ep *ep, struct fi_cq_data_entry *cq_entry, fi_addr_t src_addr)
681 {
682 	struct util_cq *target_cq;
683 	int ret;
684 
685 	if (cq_entry->flags & FI_ATOMIC) {
686 		target_cq = ep->util_ep.tx_cq;
687 	} else {
688 		assert(cq_entry->flags & FI_REMOTE_CQ_DATA);
689 		target_cq = ep->util_ep.rx_cq;
690 	}
691 
692 	if (ep->util_ep.caps & FI_SOURCE)
693 		ret = ofi_cq_write_src(target_cq,
694 				       cq_entry->op_context,
695 				       cq_entry->flags,
696 				       cq_entry->len,
697 				       cq_entry->buf,
698 				       cq_entry->data,
699 				       0,
700 				       src_addr);
701 	else
702 		ret = ofi_cq_write(target_cq,
703 				   cq_entry->op_context,
704 				   cq_entry->flags,
705 				   cq_entry->len,
706 				   cq_entry->buf,
707 				   cq_entry->data,
708 				   0);
709 
710 	rxr_rm_rx_cq_check(ep, target_cq);
711 
712 	if (OFI_UNLIKELY(ret)) {
713 		FI_WARN(&rxr_prov, FI_LOG_CQ,
714 			"Unable to write a cq entry for shm operation: %s\n",
715 			fi_strerror(-ret));
716 		efa_eq_write_error(&ep->util_ep, FI_EIO, ret);
717 	}
718 
719 	if (cq_entry->flags & FI_ATOMIC) {
720 		efa_cntr_report_tx_completion(&ep->util_ep, cq_entry->flags);
721 	} else {
722 		assert(cq_entry->flags & FI_REMOTE_CQ_DATA);
723 		efa_cntr_report_rx_completion(&ep->util_ep, cq_entry->flags);
724 	}
725 }
726 
727 static inline
rxr_cq_need_tx_completion(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)728 bool rxr_cq_need_tx_completion(struct rxr_ep *ep,
729 			       struct rxr_tx_entry *tx_entry)
730 
731 {
732 	if (tx_entry->fi_flags & RXR_NO_COMPLETION)
733 		return false;
734 
735 	/*
736 	 * ep->util_ep.tx_msg_flags is either 0 or FI_COMPLETION, depend on
737 	 * whether app specfied FI_SELECTIVE_COMPLETION when binding CQ.
738 	 * (ep->util_ep.tx_msg_flags was set in ofi_ep_bind_cq())
739 	 *
740 	 * If tx_msg_flags is 0, we only write completion when app specify
741 	 * FI_COMPLETION in flags.
742 	 */
743 	return ep->util_ep.tx_msg_flags == FI_COMPLETION ||
744 	       tx_entry->fi_flags & FI_COMPLETION;
745 }
746 
747 
rxr_cq_write_tx_completion(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)748 void rxr_cq_write_tx_completion(struct rxr_ep *ep,
749 				struct rxr_tx_entry *tx_entry)
750 {
751 	struct util_cq *tx_cq = ep->util_ep.tx_cq;
752 	int ret;
753 
754 	if (rxr_cq_need_tx_completion(ep, tx_entry)) {
755 		FI_DBG(&rxr_prov, FI_LOG_CQ,
756 		       "Writing send completion for tx_entry to peer: %" PRIu64
757 		       " tx_id: %" PRIu32 " msg_id: %" PRIu32 " tag: %lx len: %"
758 		       PRIu64 "\n",
759 		       tx_entry->addr, tx_entry->tx_id, tx_entry->msg_id,
760 		       tx_entry->cq_entry.tag, tx_entry->total_len);
761 
762 		/* TX completions should not send peer address to util_cq */
763 		if (ep->util_ep.caps & FI_SOURCE)
764 			ret = ofi_cq_write_src(tx_cq,
765 					       tx_entry->cq_entry.op_context,
766 					       tx_entry->cq_entry.flags,
767 					       tx_entry->cq_entry.len,
768 					       tx_entry->cq_entry.buf,
769 					       tx_entry->cq_entry.data,
770 					       tx_entry->cq_entry.tag,
771 					       FI_ADDR_NOTAVAIL);
772 		else
773 			ret = ofi_cq_write(tx_cq,
774 					   tx_entry->cq_entry.op_context,
775 					   tx_entry->cq_entry.flags,
776 					   tx_entry->cq_entry.len,
777 					   tx_entry->cq_entry.buf,
778 					   tx_entry->cq_entry.data,
779 					   tx_entry->cq_entry.tag);
780 
781 		rxr_rm_tx_cq_check(ep, tx_cq);
782 
783 		if (OFI_UNLIKELY(ret)) {
784 			FI_WARN(&rxr_prov, FI_LOG_CQ,
785 				"Unable to write send completion: %s\n",
786 				fi_strerror(-ret));
787 			if (rxr_cq_handle_tx_error(ep, tx_entry, ret))
788 				assert(0 && "failed to write err cq entry");
789 			return;
790 		}
791 	}
792 
793 	efa_cntr_report_tx_completion(&ep->util_ep, tx_entry->cq_entry.flags);
794 	rxr_release_tx_entry(ep, tx_entry);
795 	return;
796 }
797 
rxr_tx_entry_mr_dereg(struct rxr_tx_entry * tx_entry)798 int rxr_tx_entry_mr_dereg(struct rxr_tx_entry *tx_entry)
799 {
800 	int i, err = 0;
801 
802 	for (i = 0; i < tx_entry->iov_count; i++) {
803 		if (tx_entry->mr[i]) {
804 			err = fi_close((struct fid *)tx_entry->mr[i]);
805 			if (OFI_UNLIKELY(err)) {
806 				FI_WARN(&rxr_prov, FI_LOG_CQ, "mr dereg failed. err=%d\n", err);
807 				return err;
808 			}
809 
810 			tx_entry->mr[i] = NULL;
811 		}
812 	}
813 
814 	return 0;
815 }
816 
rxr_cq_handle_tx_completion(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)817 void rxr_cq_handle_tx_completion(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry)
818 {
819 	int ret;
820 	struct rxr_peer *peer;
821 
822 	if (tx_entry->state == RXR_TX_SEND)
823 		dlist_remove(&tx_entry->entry);
824 
825 	if (efa_mr_cache_enable && rxr_ep_mr_local(ep)) {
826 		ret = rxr_tx_entry_mr_dereg(tx_entry);
827 		if (OFI_UNLIKELY(ret)) {
828 			FI_WARN(&rxr_prov, FI_LOG_MR,
829 				"In-line memory deregistration failed with error: %s.\n",
830 				fi_strerror(-ret));
831 		}
832 	}
833 
834 	peer = rxr_ep_get_peer(ep, tx_entry->addr);
835 	peer->tx_credits += tx_entry->credit_allocated;
836 
837 	if (tx_entry->cq_entry.flags & FI_READ) {
838 		/*
839 		 * this must be on remote side
840 		 * see explaination on rxr_cq_handle_rx_completion
841 		 */
842 		struct rxr_rx_entry *rx_entry = NULL;
843 
844 		rx_entry = ofi_bufpool_get_ibuf(ep->rx_entry_pool, tx_entry->rma_loc_rx_id);
845 		assert(rx_entry);
846 		assert(rx_entry->state == RXR_RX_WAIT_READ_FINISH);
847 
848 		if (ep->util_ep.caps & FI_RMA_EVENT) {
849 			rx_entry->cq_entry.len = rx_entry->total_len;
850 			rx_entry->bytes_done = rx_entry->total_len;
851 			efa_cntr_report_rx_completion(&ep->util_ep, rx_entry->cq_entry.flags);
852 		}
853 
854 		rxr_release_rx_entry(ep, rx_entry);
855 		/* just release tx, do not write completion */
856 		rxr_release_tx_entry(ep, tx_entry);
857 	} else if (tx_entry->cq_entry.flags & FI_WRITE) {
858 		if (tx_entry->fi_flags & FI_COMPLETION) {
859 			rxr_cq_write_tx_completion(ep, tx_entry);
860 		} else {
861 			efa_cntr_report_tx_completion(&ep->util_ep, tx_entry->cq_entry.flags);
862 			rxr_release_tx_entry(ep, tx_entry);
863 		}
864 	} else {
865 		assert(tx_entry->cq_entry.flags & FI_SEND);
866 		rxr_cq_write_tx_completion(ep, tx_entry);
867 	}
868 }
869 
rxr_cq_close(struct fid * fid)870 static int rxr_cq_close(struct fid *fid)
871 {
872 	int ret;
873 	struct util_cq *cq;
874 
875 	cq = container_of(fid, struct util_cq, cq_fid.fid);
876 	ret = ofi_cq_cleanup(cq);
877 	if (ret)
878 		return ret;
879 	free(cq);
880 	return 0;
881 }
882 
883 static struct fi_ops rxr_cq_fi_ops = {
884 	.size = sizeof(struct fi_ops),
885 	.close = rxr_cq_close,
886 	.bind = fi_no_bind,
887 	.control = fi_no_control,
888 	.ops_open = fi_no_ops_open,
889 };
890 
891 static struct fi_ops_cq rxr_cq_ops = {
892 	.size = sizeof(struct fi_ops_cq),
893 	.read = ofi_cq_read,
894 	.readfrom = ofi_cq_readfrom,
895 	.readerr = ofi_cq_readerr,
896 	.sread = fi_no_cq_sread,
897 	.sreadfrom = fi_no_cq_sreadfrom,
898 	.signal = fi_no_cq_signal,
899 	.strerror = rxr_cq_strerror,
900 };
901 
rxr_cq_open(struct fid_domain * domain,struct fi_cq_attr * attr,struct fid_cq ** cq_fid,void * context)902 int rxr_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
903 		struct fid_cq **cq_fid, void *context)
904 {
905 	int ret;
906 	struct util_cq *cq;
907 	struct rxr_domain *rxr_domain;
908 
909 	if (attr->wait_obj != FI_WAIT_NONE)
910 		return -FI_ENOSYS;
911 
912 	cq = calloc(1, sizeof(*cq));
913 	if (!cq)
914 		return -FI_ENOMEM;
915 
916 	rxr_domain = container_of(domain, struct rxr_domain,
917 				  util_domain.domain_fid);
918 	/* Override user cq size if it's less than recommended cq size */
919 	attr->size = MAX(rxr_domain->cq_size, attr->size);
920 
921 	ret = ofi_cq_init(&rxr_prov, domain, attr, cq,
922 			  &ofi_cq_progress, context);
923 
924 	if (ret)
925 		goto free;
926 
927 	*cq_fid = &cq->cq_fid;
928 	(*cq_fid)->fid.ops = &rxr_cq_fi_ops;
929 	(*cq_fid)->ops = &rxr_cq_ops;
930 	return 0;
931 free:
932 	free(cq);
933 	return ret;
934 }
935