1 /*
2 * Copyright (c) 2019 Amazon.com, Inc. or its affiliates.
3 * All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34 #include <stdlib.h>
35 #include <string.h>
36 #include <inttypes.h>
37 #include <ofi_iov.h>
38 #include <ofi_recvwin.h>
39 #include "rxr.h"
40 #include "rxr_rma.h"
41 #include "rxr_msg.h"
42 #include "rxr_cntr.h"
43 #include "rxr_atomic.h"
44 #include "efa.h"
45
rxr_cq_strerror(struct fid_cq * cq_fid,int prov_errno,const void * err_data,char * buf,size_t len)46 static const char *rxr_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
47 const void *err_data, char *buf, size_t len)
48 {
49 struct fid_list_entry *fid_entry;
50 struct util_ep *util_ep;
51 struct util_cq *cq;
52 struct rxr_ep *ep;
53 const char *str;
54
55 cq = container_of(cq_fid, struct util_cq, cq_fid);
56
57 fastlock_acquire(&cq->ep_list_lock);
58 assert(!dlist_empty(&cq->ep_list));
59 fid_entry = container_of(cq->ep_list.next,
60 struct fid_list_entry, entry);
61 util_ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
62 ep = container_of(util_ep, struct rxr_ep, util_ep);
63
64 str = fi_cq_strerror(ep->rdm_cq, prov_errno, err_data, buf, len);
65 fastlock_release(&cq->ep_list_lock);
66 return str;
67 }
68
69 /*
70 * Teardown rx_entry and write an error cq entry. With our current protocol we
71 * will only encounter an RX error when sending a queued REQ or CTS packet or
72 * if we are sending a CTS message. Because of this, the sender will not send
73 * any additional data packets if the receiver encounters an error. If there is
74 * a scenario in the future where the sender will continue to send data packets
75 * we need to prevent rx_id mismatch. Ideally, we should add a NACK message and
76 * tear down both RX and TX entires although whatever caused the error may
77 * prevent that.
78 *
79 * TODO: add a NACK message to tear down state on sender side
80 */
rxr_cq_handle_rx_error(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,ssize_t prov_errno)81 int rxr_cq_handle_rx_error(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry,
82 ssize_t prov_errno)
83 {
84 struct fi_cq_err_entry err_entry;
85 struct util_cq *util_cq;
86 struct dlist_entry *tmp;
87 struct rxr_pkt_entry *pkt_entry;
88
89 memset(&err_entry, 0, sizeof(err_entry));
90
91 util_cq = ep->util_ep.rx_cq;
92
93 err_entry.err = FI_EIO;
94 err_entry.prov_errno = (int)prov_errno;
95
96 switch (rx_entry->state) {
97 case RXR_RX_INIT:
98 case RXR_RX_UNEXP:
99 dlist_remove(&rx_entry->entry);
100 break;
101 case RXR_RX_MATCHED:
102 break;
103 case RXR_RX_RECV:
104 #if ENABLE_DEBUG
105 dlist_remove(&rx_entry->rx_pending_entry);
106 #endif
107 break;
108 case RXR_RX_QUEUED_CTRL:
109 case RXR_RX_QUEUED_CTS_RNR:
110 case RXR_RX_QUEUED_EOR:
111 dlist_remove(&rx_entry->queued_entry);
112 break;
113 default:
114 FI_WARN(&rxr_prov, FI_LOG_CQ, "rx_entry unknown state %d\n",
115 rx_entry->state);
116 assert(0 && "rx_entry unknown state");
117 }
118
119 dlist_foreach_container_safe(&rx_entry->queued_pkts,
120 struct rxr_pkt_entry,
121 pkt_entry, entry, tmp)
122 rxr_pkt_entry_release_tx(ep, pkt_entry);
123
124 if (rx_entry->unexp_pkt) {
125 rxr_pkt_entry_release_rx(ep, rx_entry->unexp_pkt);
126 rx_entry->unexp_pkt = NULL;
127 }
128
129 if (rx_entry->fi_flags & FI_MULTI_RECV)
130 rxr_msg_multi_recv_handle_completion(ep, rx_entry);
131
132 err_entry.flags = rx_entry->cq_entry.flags;
133 if (rx_entry->state != RXR_RX_UNEXP)
134 err_entry.op_context = rx_entry->cq_entry.op_context;
135 err_entry.buf = rx_entry->cq_entry.buf;
136 err_entry.data = rx_entry->cq_entry.data;
137 err_entry.tag = rx_entry->cq_entry.tag;
138
139 rxr_msg_multi_recv_free_posted_entry(ep, rx_entry);
140
141 FI_WARN(&rxr_prov, FI_LOG_CQ,
142 "rxr_cq_handle_rx_error: err: %d, prov_err: %s (%d)\n",
143 err_entry.err, fi_strerror(-err_entry.prov_errno),
144 err_entry.prov_errno);
145
146 /*
147 * TODO: We can't free the rx_entry as we may receive additional
148 * packets for this entry. Add ref counting so the rx_entry can safely
149 * be freed once all packets are accounted for.
150 */
151 //rxr_release_rx_entry(ep, rx_entry);
152
153 efa_cntr_report_error(&ep->util_ep, err_entry.flags);
154 return ofi_cq_write_error(util_cq, &err_entry);
155 }
156
157 /*
158 * Teardown tx_entry and write an error cq entry. With our current protocol the
159 * receiver will only send a CTS once the window is exhausted, meaning that all
160 * data packets for that window will have been received successfully. This
161 * means that the receiver will not send any CTS packets if the sender
162 * encounters and error sending data packets. If that changes in the future we
163 * will need to be careful to prevent tx_id mismatch.
164 *
165 * TODO: add NACK message to tear down receive side state
166 */
rxr_cq_handle_tx_error(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry,ssize_t prov_errno)167 int rxr_cq_handle_tx_error(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry,
168 ssize_t prov_errno)
169 {
170 struct fi_cq_err_entry err_entry;
171 struct util_cq *util_cq;
172 uint32_t api_version;
173 struct dlist_entry *tmp;
174 struct rxr_pkt_entry *pkt_entry;
175
176 memset(&err_entry, 0, sizeof(err_entry));
177
178 util_cq = ep->util_ep.tx_cq;
179 api_version = util_cq->domain->fabric->fabric_fid.api_version;
180
181 err_entry.err = FI_EIO;
182 err_entry.prov_errno = (int)prov_errno;
183
184 switch (tx_entry->state) {
185 case RXR_TX_REQ:
186 break;
187 case RXR_TX_SEND:
188 dlist_remove(&tx_entry->entry);
189 break;
190 case RXR_TX_QUEUED_CTRL:
191 case RXR_TX_QUEUED_SHM_RMA:
192 case RXR_TX_QUEUED_REQ_RNR:
193 case RXR_TX_QUEUED_DATA_RNR:
194 dlist_remove(&tx_entry->queued_entry);
195 break;
196 case RXR_TX_SENT_READRSP:
197 case RXR_TX_WAIT_READ_FINISH:
198 break;
199 default:
200 FI_WARN(&rxr_prov, FI_LOG_CQ, "tx_entry unknown state %d\n",
201 tx_entry->state);
202 assert(0 && "tx_entry unknown state");
203 }
204
205 dlist_foreach_container_safe(&tx_entry->queued_pkts,
206 struct rxr_pkt_entry,
207 pkt_entry, entry, tmp)
208 rxr_pkt_entry_release_tx(ep, pkt_entry);
209
210 err_entry.flags = tx_entry->cq_entry.flags;
211 err_entry.op_context = tx_entry->cq_entry.op_context;
212 err_entry.buf = tx_entry->cq_entry.buf;
213 err_entry.data = tx_entry->cq_entry.data;
214 err_entry.tag = tx_entry->cq_entry.tag;
215 if (FI_VERSION_GE(api_version, FI_VERSION(1, 5)))
216 err_entry.err_data_size = 0;
217
218 FI_WARN(&rxr_prov, FI_LOG_CQ,
219 "rxr_cq_handle_tx_error: err: %d, prov_err: %s (%d)\n",
220 err_entry.err, fi_strerror(-err_entry.prov_errno),
221 err_entry.prov_errno);
222
223 /*
224 * TODO: We can't free the tx_entry as we may receive a control packet
225 * packet for this entry. Add ref counting so the tx_entry can safely
226 * be freed once all packets are accounted for.
227 */
228 //rxr_release_tx_entry(ep, tx_entry);
229
230 efa_cntr_report_error(&ep->util_ep, tx_entry->cq_entry.flags);
231 return ofi_cq_write_error(util_cq, &err_entry);
232 }
233
234 /*
235 * Queue a packet on the appropriate list when an RNR error is received.
236 */
rxr_cq_queue_pkt(struct rxr_ep * ep,struct dlist_entry * list,struct rxr_pkt_entry * pkt_entry)237 static inline void rxr_cq_queue_pkt(struct rxr_ep *ep,
238 struct dlist_entry *list,
239 struct rxr_pkt_entry *pkt_entry)
240 {
241 struct rxr_peer *peer;
242
243 peer = rxr_ep_get_peer(ep, pkt_entry->addr);
244
245 /*
246 * Queue the packet if it has not been retransmitted yet.
247 */
248 if (pkt_entry->state != RXR_PKT_ENTRY_RNR_RETRANSMIT) {
249 pkt_entry->state = RXR_PKT_ENTRY_RNR_RETRANSMIT;
250 peer->rnr_queued_pkt_cnt++;
251 goto queue_pkt;
252 }
253
254 /*
255 * Otherwise, increase the backoff if the peer is already not in
256 * backoff. Reset the timer when starting backoff or if another RNR for
257 * a retransmitted packet is received while waiting for the timer to
258 * expire.
259 */
260 peer->rnr_ts = ofi_gettime_us();
261 if (peer->flags & RXR_PEER_IN_BACKOFF)
262 goto queue_pkt;
263
264 peer->flags |= RXR_PEER_IN_BACKOFF;
265
266 if (!peer->timeout_interval) {
267 if (rxr_env.timeout_interval)
268 peer->timeout_interval = rxr_env.timeout_interval;
269 else
270 peer->timeout_interval = MAX(RXR_RAND_MIN_TIMEOUT,
271 rand() %
272 RXR_RAND_MAX_TIMEOUT);
273
274 peer->rnr_timeout_exp = 1;
275 FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
276 "initializing backoff timeout for peer: %" PRIu64
277 " timeout: %d rnr_queued_pkts: %d\n",
278 pkt_entry->addr, peer->timeout_interval,
279 peer->rnr_queued_pkt_cnt);
280 } else {
281 /* Only backoff once per peer per progress thread loop. */
282 if (!(peer->flags & RXR_PEER_BACKED_OFF)) {
283 peer->flags |= RXR_PEER_BACKED_OFF;
284 peer->rnr_timeout_exp++;
285 FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
286 "increasing backoff for peer: %" PRIu64
287 " rnr_timeout_exp: %d rnr_queued_pkts: %d\n",
288 pkt_entry->addr, peer->rnr_timeout_exp,
289 peer->rnr_queued_pkt_cnt);
290 }
291 }
292 dlist_insert_tail(&peer->rnr_entry,
293 &ep->peer_backoff_list);
294
295 queue_pkt:
296 #if ENABLE_DEBUG
297 dlist_remove(&pkt_entry->dbg_entry);
298 #endif
299 dlist_insert_tail(&pkt_entry->entry, list);
300 }
301
rxr_cq_handle_cq_error(struct rxr_ep * ep,ssize_t err)302 int rxr_cq_handle_cq_error(struct rxr_ep *ep, ssize_t err)
303 {
304 struct fi_cq_err_entry err_entry;
305 struct rxr_pkt_entry *pkt_entry;
306 struct rxr_rx_entry *rx_entry;
307 struct rxr_tx_entry *tx_entry;
308 struct rxr_peer *peer;
309 ssize_t ret;
310
311 memset(&err_entry, 0, sizeof(err_entry));
312
313 /*
314 * If the cq_read failed with another error besides -FI_EAVAIL or
315 * the cq_readerr fails we don't know if this is an rx or tx error.
316 * We'll write an error eq entry to the event queue instead.
317 */
318
319 err_entry.err = FI_EIO;
320 err_entry.prov_errno = (int)err;
321
322 if (err != -FI_EAVAIL) {
323 FI_WARN(&rxr_prov, FI_LOG_CQ, "fi_cq_read: %s\n",
324 fi_strerror(-err));
325 goto write_err;
326 }
327
328 ret = fi_cq_readerr(ep->rdm_cq, &err_entry, 0);
329 if (ret != 1) {
330 if (ret < 0) {
331 FI_WARN(&rxr_prov, FI_LOG_CQ, "fi_cq_readerr: %s\n",
332 fi_strerror(-ret));
333 err_entry.prov_errno = ret;
334 } else {
335 FI_WARN(&rxr_prov, FI_LOG_CQ,
336 "fi_cq_readerr unexpected size %zu expected %zu\n",
337 ret, sizeof(err_entry));
338 err_entry.prov_errno = -FI_EIO;
339 }
340 goto write_err;
341 }
342
343 if (err_entry.err != -FI_EAGAIN)
344 OFI_CQ_STRERROR(&rxr_prov, FI_LOG_WARN, FI_LOG_CQ, ep->rdm_cq,
345 &err_entry);
346
347 pkt_entry = (struct rxr_pkt_entry *)err_entry.op_context;
348 peer = rxr_ep_get_peer(ep, pkt_entry->addr);
349
350 /*
351 * A handshake send could fail at the core provider if the peer endpoint
352 * is shutdown soon after it receives a send completion for the REQ
353 * packet that included src_address. The handshake itself is irrelevant if
354 * that happens, so just squelch this error entry and move on without
355 * writing an error completion or event to the application.
356 */
357 if (rxr_get_base_hdr(pkt_entry->pkt)->type == RXR_HANDSHAKE_PKT) {
358 FI_WARN(&rxr_prov, FI_LOG_CQ,
359 "Squelching error CQE for RXR_HANDSHAKE_PKT\n");
360 /*
361 * HANDSHAKE packets do not have an associated rx/tx entry. Use
362 * the flags instead to determine if this is a send or recv.
363 */
364 if (err_entry.flags & FI_SEND) {
365 rxr_ep_dec_tx_pending(ep, peer, 1);
366 rxr_pkt_entry_release_tx(ep, pkt_entry);
367 } else if (err_entry.flags & FI_RECV) {
368 rxr_pkt_entry_release_rx(ep, pkt_entry);
369 } else {
370 assert(0 && "unknown err_entry flags in HANDSHAKE packet");
371 }
372 return 0;
373 }
374
375 if (!pkt_entry->x_entry) {
376 /*
377 * A NULL x_entry means this is a recv posted buf pkt_entry.
378 * Since we don't have any context besides the error code,
379 * we will write to the eq instead.
380 */
381 rxr_pkt_entry_release_rx(ep, pkt_entry);
382 goto write_err;
383 }
384
385 /*
386 * If x_entry is set this rx or tx entry error is for a sent
387 * packet. Decrement the tx_pending counter and fall through to
388 * the rx or tx entry handlers.
389 */
390 if (!peer->is_local)
391 rxr_ep_dec_tx_pending(ep, peer, 1);
392 if (RXR_GET_X_ENTRY_TYPE(pkt_entry) == RXR_TX_ENTRY) {
393 tx_entry = (struct rxr_tx_entry *)pkt_entry->x_entry;
394 if (err_entry.err != -FI_EAGAIN ||
395 rxr_ep_domain(ep)->resource_mgmt == FI_RM_ENABLED) {
396 ret = rxr_cq_handle_tx_error(ep, tx_entry,
397 err_entry.prov_errno);
398 rxr_pkt_entry_release_tx(ep, pkt_entry);
399 return ret;
400 }
401
402 rxr_cq_queue_pkt(ep, &tx_entry->queued_pkts, pkt_entry);
403 if (tx_entry->state == RXR_TX_SEND) {
404 dlist_remove(&tx_entry->entry);
405 tx_entry->state = RXR_TX_QUEUED_DATA_RNR;
406 dlist_insert_tail(&tx_entry->queued_entry,
407 &ep->tx_entry_queued_list);
408 } else if (tx_entry->state == RXR_TX_REQ) {
409 tx_entry->state = RXR_TX_QUEUED_REQ_RNR;
410 dlist_insert_tail(&tx_entry->queued_entry,
411 &ep->tx_entry_queued_list);
412 }
413 return 0;
414 } else if (RXR_GET_X_ENTRY_TYPE(pkt_entry) == RXR_RX_ENTRY) {
415 rx_entry = (struct rxr_rx_entry *)pkt_entry->x_entry;
416 if (err_entry.err != -FI_EAGAIN ||
417 rxr_ep_domain(ep)->resource_mgmt == FI_RM_ENABLED) {
418 ret = rxr_cq_handle_rx_error(ep, rx_entry,
419 err_entry.prov_errno);
420 rxr_pkt_entry_release_tx(ep, pkt_entry);
421 return ret;
422 }
423 rxr_cq_queue_pkt(ep, &rx_entry->queued_pkts, pkt_entry);
424 if (rx_entry->state == RXR_RX_RECV) {
425 rx_entry->state = RXR_RX_QUEUED_CTS_RNR;
426 dlist_insert_tail(&rx_entry->queued_entry,
427 &ep->rx_entry_queued_list);
428 }
429 return 0;
430 }
431
432 FI_WARN(&rxr_prov, FI_LOG_CQ,
433 "%s unknown x_entry state %d\n",
434 __func__, RXR_GET_X_ENTRY_TYPE(pkt_entry));
435 assert(0 && "unknown x_entry state");
436 write_err:
437 efa_eq_write_error(&ep->util_ep, err_entry.err, err_entry.prov_errno);
438 return 0;
439 }
440
rxr_cq_write_rx_completion(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)441 void rxr_cq_write_rx_completion(struct rxr_ep *ep,
442 struct rxr_rx_entry *rx_entry)
443 {
444 struct util_cq *rx_cq = ep->util_ep.rx_cq;
445 int ret = 0;
446 if (OFI_UNLIKELY(rx_entry->cq_entry.len < rx_entry->total_len)) {
447 FI_WARN(&rxr_prov, FI_LOG_CQ,
448 "Message truncated: tag: %"PRIu64" len: %"PRIu64" total_len: %zu\n",
449 rx_entry->cq_entry.tag, rx_entry->total_len,
450 rx_entry->cq_entry.len);
451
452 ret = ofi_cq_write_error_trunc(ep->util_ep.rx_cq,
453 rx_entry->cq_entry.op_context,
454 rx_entry->cq_entry.flags,
455 rx_entry->total_len,
456 rx_entry->cq_entry.buf,
457 rx_entry->cq_entry.data,
458 rx_entry->cq_entry.tag,
459 rx_entry->total_len -
460 rx_entry->cq_entry.len);
461
462 rxr_rm_rx_cq_check(ep, rx_cq);
463
464 if (OFI_UNLIKELY(ret))
465 FI_WARN(&rxr_prov, FI_LOG_CQ,
466 "Unable to write recv error cq: %s\n",
467 fi_strerror(-ret));
468
469 efa_cntr_report_error(&ep->util_ep, rx_entry->cq_entry.flags);
470 return;
471 }
472
473 if (!(rx_entry->rxr_flags & RXR_RECV_CANCEL) &&
474 (ofi_need_completion(rxr_rx_flags(ep), rx_entry->fi_flags) ||
475 (rx_entry->cq_entry.flags & FI_MULTI_RECV))) {
476 FI_DBG(&rxr_prov, FI_LOG_CQ,
477 "Writing recv completion for rx_entry from peer: %"
478 PRIu64 " rx_id: %" PRIu32 " msg_id: %" PRIu32
479 " tag: %lx total_len: %" PRIu64 "\n",
480 rx_entry->addr, rx_entry->rx_id, rx_entry->msg_id,
481 rx_entry->cq_entry.tag, rx_entry->total_len);
482
483 if (ep->util_ep.caps & FI_SOURCE)
484 ret = ofi_cq_write_src(rx_cq,
485 rx_entry->cq_entry.op_context,
486 rx_entry->cq_entry.flags,
487 rx_entry->cq_entry.len,
488 rx_entry->cq_entry.buf,
489 rx_entry->cq_entry.data,
490 rx_entry->cq_entry.tag,
491 rx_entry->addr);
492 else
493 ret = ofi_cq_write(rx_cq,
494 rx_entry->cq_entry.op_context,
495 rx_entry->cq_entry.flags,
496 rx_entry->cq_entry.len,
497 rx_entry->cq_entry.buf,
498 rx_entry->cq_entry.data,
499 rx_entry->cq_entry.tag);
500
501 rxr_rm_rx_cq_check(ep, rx_cq);
502
503 if (OFI_UNLIKELY(ret)) {
504 FI_WARN(&rxr_prov, FI_LOG_CQ,
505 "Unable to write recv completion: %s\n",
506 fi_strerror(-ret));
507 if (rxr_cq_handle_rx_error(ep, rx_entry, ret))
508 assert(0 && "failed to write err cq entry");
509 return;
510 }
511 }
512
513 efa_cntr_report_rx_completion(&ep->util_ep, rx_entry->cq_entry.flags);
514 }
515
rxr_cq_handle_rx_completion(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,struct rxr_rx_entry * rx_entry)516 void rxr_cq_handle_rx_completion(struct rxr_ep *ep,
517 struct rxr_pkt_entry *pkt_entry,
518 struct rxr_rx_entry *rx_entry)
519 {
520 struct rxr_tx_entry *tx_entry = NULL;
521
522 if (rx_entry->cq_entry.flags & FI_WRITE) {
523 /*
524 * must be on the remote side, notify cq/counter
525 * if FI_RMA_EVENT is requested or REMOTE_CQ_DATA is on
526 */
527 if (rx_entry->cq_entry.flags & FI_REMOTE_CQ_DATA)
528 rxr_cq_write_rx_completion(ep, rx_entry);
529 else if (ep->util_ep.caps & FI_RMA_EVENT)
530 efa_cntr_report_rx_completion(&ep->util_ep, rx_entry->cq_entry.flags);
531
532 rxr_pkt_entry_release_rx(ep, pkt_entry);
533 return;
534 }
535
536 if (rx_entry->cq_entry.flags & FI_READ) {
537 /* Note for emulated FI_READ, there is an rx_entry on
538 * both initiator side and on remote side.
539 * However, only on the initiator side,
540 * rxr_cq_handle_rx_completion() will be called.
541 * The following shows the sequence of events that
542 * is happening
543 *
544 * Initiator side Remote side
545 * create tx_entry
546 * create rx_entry
547 * send rtr(with rx_id)
548 * receive rtr
549 * create rx_entry
550 * create tx_entry
551 * tx_entry sending data
552 * rx_entry receiving data
553 * receive completed send completed
554 * handle_rx_completion() handle_pkt_send_completion()
555 * |->write_tx_completion() |-> if (FI_RMA_EVENT)
556 * write_rx_completion()
557 *
558 * As can be seen, although there is a rx_entry on remote side,
559 * the entry will not enter into rxr_cq_handle_rx_completion
560 * So at this point we must be on the initiator side, we
561 * 1. find the corresponding tx_entry
562 * 2. call rxr_cq_write_tx_completion()
563 */
564 tx_entry = ofi_bufpool_get_ibuf(ep->tx_entry_pool, rx_entry->rma_loc_tx_id);
565 assert(tx_entry->state == RXR_TX_WAIT_READ_FINISH);
566 if (tx_entry->fi_flags & FI_COMPLETION) {
567 /* Note write_tx_completion() will release tx_entry */
568 rxr_cq_write_tx_completion(ep, tx_entry);
569 } else {
570 efa_cntr_report_tx_completion(&ep->util_ep, tx_entry->cq_entry.flags);
571 rxr_release_tx_entry(ep, tx_entry);
572 }
573
574 /*
575 * do not call rxr_release_rx_entry here because
576 * caller will release
577 */
578 rxr_pkt_entry_release_rx(ep, pkt_entry);
579 return;
580 }
581
582 if (rx_entry->fi_flags & FI_MULTI_RECV)
583 rxr_msg_multi_recv_handle_completion(ep, rx_entry);
584
585 rxr_cq_write_rx_completion(ep, rx_entry);
586 rxr_pkt_entry_release_rx(ep, pkt_entry);
587 return;
588 }
589
rxr_cq_reorder_msg(struct rxr_ep * ep,struct rxr_peer * peer,struct rxr_pkt_entry * pkt_entry)590 int rxr_cq_reorder_msg(struct rxr_ep *ep,
591 struct rxr_peer *peer,
592 struct rxr_pkt_entry *pkt_entry)
593 {
594 struct rxr_pkt_entry *ooo_entry;
595 struct rxr_pkt_entry *cur_ooo_entry;
596 uint32_t msg_id;
597
598 assert(rxr_get_base_hdr(pkt_entry->pkt)->type >= RXR_REQ_PKT_BEGIN);
599
600 msg_id = rxr_pkt_msg_id(pkt_entry);
601 /*
602 * TODO: Initialize peer state at the time of AV insertion
603 * where duplicate detection is available.
604 */
605 if (!peer->rx_init)
606 rxr_ep_peer_init_rx(ep, peer);
607
608 #if ENABLE_DEBUG
609 if (msg_id != ofi_recvwin_next_exp_id(peer->robuf))
610 FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
611 "msg OOO msg_id: %" PRIu32 " expected: %"
612 PRIu32 "\n", msg_id,
613 ofi_recvwin_next_exp_id(peer->robuf));
614 #endif
615 if (ofi_recvwin_is_exp(peer->robuf, msg_id))
616 return 0;
617 else if (!ofi_recvwin_id_valid(peer->robuf, msg_id))
618 return -FI_EALREADY;
619
620 if (OFI_LIKELY(rxr_env.rx_copy_ooo)) {
621 assert(pkt_entry->type == RXR_PKT_ENTRY_POSTED);
622 ooo_entry = rxr_pkt_entry_clone(ep, ep->rx_ooo_pkt_pool, pkt_entry, RXR_PKT_ENTRY_OOO);
623 if (OFI_UNLIKELY(!ooo_entry)) {
624 FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
625 "Unable to allocate rx_pkt_entry for OOO msg\n");
626 return -FI_ENOMEM;
627 }
628 rxr_pkt_entry_release_rx(ep, pkt_entry);
629 } else {
630 ooo_entry = pkt_entry;
631 }
632
633 cur_ooo_entry = *ofi_recvwin_get_msg(peer->robuf, msg_id);
634 if (cur_ooo_entry) {
635 assert(rxr_get_base_hdr(cur_ooo_entry->pkt)->type == RXR_MEDIUM_MSGRTM_PKT ||
636 rxr_get_base_hdr(cur_ooo_entry->pkt)->type == RXR_MEDIUM_TAGRTM_PKT);
637 assert(rxr_pkt_msg_id(cur_ooo_entry) == msg_id);
638 assert(rxr_pkt_rtm_total_len(cur_ooo_entry) == rxr_pkt_rtm_total_len(ooo_entry));
639 rxr_pkt_entry_append(cur_ooo_entry, ooo_entry);
640 } else {
641 ofi_recvwin_queue_msg(peer->robuf, &ooo_entry, msg_id);
642 }
643
644 return 1;
645 }
646
rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep * ep,struct rxr_peer * peer)647 void rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep *ep,
648 struct rxr_peer *peer)
649 {
650 struct rxr_pkt_entry *pending_pkt;
651 int ret = 0;
652 uint32_t msg_id;
653
654 while (1) {
655 pending_pkt = *ofi_recvwin_peek(peer->robuf);
656 if (!pending_pkt || !pending_pkt->pkt)
657 return;
658
659 msg_id = rxr_pkt_msg_id(pending_pkt);
660 FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
661 "Processing msg_id %d from robuf\n", msg_id);
662 /* rxr_pkt_proc_rtm_rta will write error cq entry if needed */
663 ret = rxr_pkt_proc_rtm_rta(ep, pending_pkt);
664 *ofi_recvwin_get_next_msg(peer->robuf) = NULL;
665 if (OFI_UNLIKELY(ret)) {
666 FI_WARN(&rxr_prov, FI_LOG_CQ,
667 "Error processing msg_id %d from robuf: %s\n",
668 msg_id, fi_strerror(-ret));
669 return;
670 }
671 }
672 return;
673 }
674
675 /* Handle two scenarios:
676 * 1. RMA writes with immediate data at remote endpoint,
677 * 2. atomic completion on the requester
678 * write completion for both
679 */
rxr_cq_handle_shm_completion(struct rxr_ep * ep,struct fi_cq_data_entry * cq_entry,fi_addr_t src_addr)680 void rxr_cq_handle_shm_completion(struct rxr_ep *ep, struct fi_cq_data_entry *cq_entry, fi_addr_t src_addr)
681 {
682 struct util_cq *target_cq;
683 int ret;
684
685 if (cq_entry->flags & FI_ATOMIC) {
686 target_cq = ep->util_ep.tx_cq;
687 } else {
688 assert(cq_entry->flags & FI_REMOTE_CQ_DATA);
689 target_cq = ep->util_ep.rx_cq;
690 }
691
692 if (ep->util_ep.caps & FI_SOURCE)
693 ret = ofi_cq_write_src(target_cq,
694 cq_entry->op_context,
695 cq_entry->flags,
696 cq_entry->len,
697 cq_entry->buf,
698 cq_entry->data,
699 0,
700 src_addr);
701 else
702 ret = ofi_cq_write(target_cq,
703 cq_entry->op_context,
704 cq_entry->flags,
705 cq_entry->len,
706 cq_entry->buf,
707 cq_entry->data,
708 0);
709
710 rxr_rm_rx_cq_check(ep, target_cq);
711
712 if (OFI_UNLIKELY(ret)) {
713 FI_WARN(&rxr_prov, FI_LOG_CQ,
714 "Unable to write a cq entry for shm operation: %s\n",
715 fi_strerror(-ret));
716 efa_eq_write_error(&ep->util_ep, FI_EIO, ret);
717 }
718
719 if (cq_entry->flags & FI_ATOMIC) {
720 efa_cntr_report_tx_completion(&ep->util_ep, cq_entry->flags);
721 } else {
722 assert(cq_entry->flags & FI_REMOTE_CQ_DATA);
723 efa_cntr_report_rx_completion(&ep->util_ep, cq_entry->flags);
724 }
725 }
726
727 static inline
rxr_cq_need_tx_completion(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)728 bool rxr_cq_need_tx_completion(struct rxr_ep *ep,
729 struct rxr_tx_entry *tx_entry)
730
731 {
732 if (tx_entry->fi_flags & RXR_NO_COMPLETION)
733 return false;
734
735 /*
736 * ep->util_ep.tx_msg_flags is either 0 or FI_COMPLETION, depend on
737 * whether app specfied FI_SELECTIVE_COMPLETION when binding CQ.
738 * (ep->util_ep.tx_msg_flags was set in ofi_ep_bind_cq())
739 *
740 * If tx_msg_flags is 0, we only write completion when app specify
741 * FI_COMPLETION in flags.
742 */
743 return ep->util_ep.tx_msg_flags == FI_COMPLETION ||
744 tx_entry->fi_flags & FI_COMPLETION;
745 }
746
747
rxr_cq_write_tx_completion(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)748 void rxr_cq_write_tx_completion(struct rxr_ep *ep,
749 struct rxr_tx_entry *tx_entry)
750 {
751 struct util_cq *tx_cq = ep->util_ep.tx_cq;
752 int ret;
753
754 if (rxr_cq_need_tx_completion(ep, tx_entry)) {
755 FI_DBG(&rxr_prov, FI_LOG_CQ,
756 "Writing send completion for tx_entry to peer: %" PRIu64
757 " tx_id: %" PRIu32 " msg_id: %" PRIu32 " tag: %lx len: %"
758 PRIu64 "\n",
759 tx_entry->addr, tx_entry->tx_id, tx_entry->msg_id,
760 tx_entry->cq_entry.tag, tx_entry->total_len);
761
762 /* TX completions should not send peer address to util_cq */
763 if (ep->util_ep.caps & FI_SOURCE)
764 ret = ofi_cq_write_src(tx_cq,
765 tx_entry->cq_entry.op_context,
766 tx_entry->cq_entry.flags,
767 tx_entry->cq_entry.len,
768 tx_entry->cq_entry.buf,
769 tx_entry->cq_entry.data,
770 tx_entry->cq_entry.tag,
771 FI_ADDR_NOTAVAIL);
772 else
773 ret = ofi_cq_write(tx_cq,
774 tx_entry->cq_entry.op_context,
775 tx_entry->cq_entry.flags,
776 tx_entry->cq_entry.len,
777 tx_entry->cq_entry.buf,
778 tx_entry->cq_entry.data,
779 tx_entry->cq_entry.tag);
780
781 rxr_rm_tx_cq_check(ep, tx_cq);
782
783 if (OFI_UNLIKELY(ret)) {
784 FI_WARN(&rxr_prov, FI_LOG_CQ,
785 "Unable to write send completion: %s\n",
786 fi_strerror(-ret));
787 if (rxr_cq_handle_tx_error(ep, tx_entry, ret))
788 assert(0 && "failed to write err cq entry");
789 return;
790 }
791 }
792
793 efa_cntr_report_tx_completion(&ep->util_ep, tx_entry->cq_entry.flags);
794 rxr_release_tx_entry(ep, tx_entry);
795 return;
796 }
797
rxr_tx_entry_mr_dereg(struct rxr_tx_entry * tx_entry)798 int rxr_tx_entry_mr_dereg(struct rxr_tx_entry *tx_entry)
799 {
800 int i, err = 0;
801
802 for (i = 0; i < tx_entry->iov_count; i++) {
803 if (tx_entry->mr[i]) {
804 err = fi_close((struct fid *)tx_entry->mr[i]);
805 if (OFI_UNLIKELY(err)) {
806 FI_WARN(&rxr_prov, FI_LOG_CQ, "mr dereg failed. err=%d\n", err);
807 return err;
808 }
809
810 tx_entry->mr[i] = NULL;
811 }
812 }
813
814 return 0;
815 }
816
rxr_cq_handle_tx_completion(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)817 void rxr_cq_handle_tx_completion(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry)
818 {
819 int ret;
820 struct rxr_peer *peer;
821
822 if (tx_entry->state == RXR_TX_SEND)
823 dlist_remove(&tx_entry->entry);
824
825 if (efa_mr_cache_enable && rxr_ep_mr_local(ep)) {
826 ret = rxr_tx_entry_mr_dereg(tx_entry);
827 if (OFI_UNLIKELY(ret)) {
828 FI_WARN(&rxr_prov, FI_LOG_MR,
829 "In-line memory deregistration failed with error: %s.\n",
830 fi_strerror(-ret));
831 }
832 }
833
834 peer = rxr_ep_get_peer(ep, tx_entry->addr);
835 peer->tx_credits += tx_entry->credit_allocated;
836
837 if (tx_entry->cq_entry.flags & FI_READ) {
838 /*
839 * this must be on remote side
840 * see explaination on rxr_cq_handle_rx_completion
841 */
842 struct rxr_rx_entry *rx_entry = NULL;
843
844 rx_entry = ofi_bufpool_get_ibuf(ep->rx_entry_pool, tx_entry->rma_loc_rx_id);
845 assert(rx_entry);
846 assert(rx_entry->state == RXR_RX_WAIT_READ_FINISH);
847
848 if (ep->util_ep.caps & FI_RMA_EVENT) {
849 rx_entry->cq_entry.len = rx_entry->total_len;
850 rx_entry->bytes_done = rx_entry->total_len;
851 efa_cntr_report_rx_completion(&ep->util_ep, rx_entry->cq_entry.flags);
852 }
853
854 rxr_release_rx_entry(ep, rx_entry);
855 /* just release tx, do not write completion */
856 rxr_release_tx_entry(ep, tx_entry);
857 } else if (tx_entry->cq_entry.flags & FI_WRITE) {
858 if (tx_entry->fi_flags & FI_COMPLETION) {
859 rxr_cq_write_tx_completion(ep, tx_entry);
860 } else {
861 efa_cntr_report_tx_completion(&ep->util_ep, tx_entry->cq_entry.flags);
862 rxr_release_tx_entry(ep, tx_entry);
863 }
864 } else {
865 assert(tx_entry->cq_entry.flags & FI_SEND);
866 rxr_cq_write_tx_completion(ep, tx_entry);
867 }
868 }
869
rxr_cq_close(struct fid * fid)870 static int rxr_cq_close(struct fid *fid)
871 {
872 int ret;
873 struct util_cq *cq;
874
875 cq = container_of(fid, struct util_cq, cq_fid.fid);
876 ret = ofi_cq_cleanup(cq);
877 if (ret)
878 return ret;
879 free(cq);
880 return 0;
881 }
882
883 static struct fi_ops rxr_cq_fi_ops = {
884 .size = sizeof(struct fi_ops),
885 .close = rxr_cq_close,
886 .bind = fi_no_bind,
887 .control = fi_no_control,
888 .ops_open = fi_no_ops_open,
889 };
890
891 static struct fi_ops_cq rxr_cq_ops = {
892 .size = sizeof(struct fi_ops_cq),
893 .read = ofi_cq_read,
894 .readfrom = ofi_cq_readfrom,
895 .readerr = ofi_cq_readerr,
896 .sread = fi_no_cq_sread,
897 .sreadfrom = fi_no_cq_sreadfrom,
898 .signal = fi_no_cq_signal,
899 .strerror = rxr_cq_strerror,
900 };
901
rxr_cq_open(struct fid_domain * domain,struct fi_cq_attr * attr,struct fid_cq ** cq_fid,void * context)902 int rxr_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
903 struct fid_cq **cq_fid, void *context)
904 {
905 int ret;
906 struct util_cq *cq;
907 struct rxr_domain *rxr_domain;
908
909 if (attr->wait_obj != FI_WAIT_NONE)
910 return -FI_ENOSYS;
911
912 cq = calloc(1, sizeof(*cq));
913 if (!cq)
914 return -FI_ENOMEM;
915
916 rxr_domain = container_of(domain, struct rxr_domain,
917 util_domain.domain_fid);
918 /* Override user cq size if it's less than recommended cq size */
919 attr->size = MAX(rxr_domain->cq_size, attr->size);
920
921 ret = ofi_cq_init(&rxr_prov, domain, attr, cq,
922 &ofi_cq_progress, context);
923
924 if (ret)
925 goto free;
926
927 *cq_fid = &cq->cq_fid;
928 (*cq_fid)->fid.ops = &rxr_cq_fi_ops;
929 (*cq_fid)->ops = &rxr_cq_ops;
930 return 0;
931 free:
932 free(cq);
933 return ret;
934 }
935