1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2020.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "dc_mlx5_ep.h"
12 #include "dc_mlx5.h"
13 
14 #include <uct/ib/rc/accel/rc_mlx5.inl>
15 #include <uct/ib/mlx5/ib_mlx5_log.h>
16 
17 #define UCT_DC_MLX5_IFACE_TXQP_GET(_iface, _ep, _txqp, _txwq) \
18 { \
19     uint8_t dci; \
20     dci = (_ep)->dci; \
21     _txqp = &(_iface)->tx.dcis[dci].txqp; \
22     _txwq = &(_iface)->tx.dcis[dci].txwq; \
23 }
24 
25 static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_bcopy_post(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,unsigned opcode,unsigned length,uint64_t rdma_raddr,uct_rkey_t rdma_rkey,uct_rc_iface_send_desc_t * desc,uint8_t send_flags,uint32_t imm_val_be,const void * buffer,uct_ib_log_sge_t * log_sge)26 uct_dc_mlx5_iface_bcopy_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
27                             unsigned opcode, unsigned length,
28                             /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
29                             uct_rc_iface_send_desc_t *desc, uint8_t send_flags,
30                             uint32_t imm_val_be, const void *buffer,
31                             uct_ib_log_sge_t *log_sge)
32 {
33     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
34 
35     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
36     desc->super.sn = txwq->sw_pi;
37     uct_rc_mlx5_txqp_dptr_post(&iface->super, UCT_IB_QPT_DCI, txqp, txwq,
38                                opcode, buffer, length, &desc->lkey,
39                                rdma_raddr, rdma_rkey, 0, 0, 0, 0,
40                                &ep->av, uct_dc_mlx5_ep_get_grh(ep),
41                                uct_ib_mlx5_wqe_av_size(&ep->av),
42                                MLX5_WQE_CTRL_CQ_UPDATE | send_flags, imm_val_be, INT_MAX,
43                                log_sge);
44     uct_rc_txqp_add_send_op(txqp, &desc->super);
45 }
46 
47 
48 static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_zcopy_post(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,unsigned opcode,const uct_iov_t * iov,size_t iovcnt,size_t iov_total_length,uint8_t am_id,const void * am_hdr,unsigned am_hdr_len,uint64_t rdma_raddr,uct_rkey_t rdma_rkey,uct_tag_t tag,uint32_t app_ctx,uint32_t ib_imm_be,uct_rc_send_handler_t handler,uct_completion_t * comp,uint8_t send_flags)49 uct_dc_mlx5_iface_zcopy_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
50                              unsigned opcode, const uct_iov_t *iov, size_t iovcnt,
51                              size_t iov_total_length,
52                              /* SEND */ uint8_t am_id, const void *am_hdr, unsigned am_hdr_len,
53                              /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
54                              /* TAG  */ uct_tag_t tag, uint32_t app_ctx, uint32_t ib_imm_be,
55                              uct_rc_send_handler_t handler,  uct_completion_t *comp,
56                              uint8_t send_flags)
57 {
58     uint16_t sn;
59     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
60 
61     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
62 
63     sn = txwq->sw_pi;
64     uct_rc_mlx5_txqp_dptr_post_iov(&iface->super, UCT_IB_QPT_DCI, txqp,
65                                    txwq, opcode, iov, iovcnt,
66                                    am_id, am_hdr, am_hdr_len,
67                                    rdma_raddr, rdma_rkey,
68                                    tag, app_ctx, ib_imm_be,
69                                    &ep->av, uct_dc_mlx5_ep_get_grh(ep),
70                                    uct_ib_mlx5_wqe_av_size(&ep->av),
71                                    MLX5_WQE_CTRL_CQ_UPDATE | send_flags,
72                                    UCT_IB_MAX_ZCOPY_LOG_SGE(&iface->super.super.super));
73 
74     uct_rc_txqp_add_send_comp(&iface->super.super, txqp, handler, comp, sn,
75                               UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY, iov_total_length);
76 }
77 
78 static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,unsigned opcode,uct_rc_iface_send_desc_t * desc,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uint64_t compare_mask,uint64_t compare,uint64_t swap_mask,uint64_t swap_add)79 uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
80                               unsigned opcode, uct_rc_iface_send_desc_t *desc, unsigned length,
81                               uint64_t remote_addr, uct_rkey_t rkey,
82                               uint64_t compare_mask, uint64_t compare,
83                               uint64_t swap_mask, uint64_t swap_add)
84 {
85     uint32_t ib_rkey = uct_ib_resolve_atomic_rkey(rkey, ep->atomic_mr_offset,
86                                                   &remote_addr);
87 
88     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
89     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
90 
91     desc->super.sn = txwq->sw_pi;
92     uct_rc_mlx5_txqp_dptr_post(&iface->super, UCT_IB_QPT_DCI, txqp, txwq,
93                                opcode, desc + 1, length, &desc->lkey,
94                                remote_addr, ib_rkey,
95                                compare_mask, compare, swap_mask, swap_add,
96                                &ep->av, uct_dc_mlx5_ep_get_grh(ep),
97                                uct_ib_mlx5_wqe_av_size(&ep->av),
98                                MLX5_WQE_CTRL_CQ_UPDATE, 0, INT_MAX, NULL);
99 
100     UCT_TL_EP_STAT_ATOMIC(&ep->super);
101     uct_rc_txqp_add_send_op(txqp, &desc->super);
102 }
103 
104 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep,unsigned opcode,unsigned size,uint64_t value,uint64_t remote_addr,uct_rkey_t rkey)105 uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
106                               uint64_t value, uint64_t remote_addr, uct_rkey_t rkey)
107 {
108     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
109     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
110     uct_rc_iface_send_desc_t *desc;
111     int op;
112     uint64_t compare_mask;
113     uint64_t compare;
114     uint64_t swap_mask;
115     uint64_t swap;
116     int      ext; /* not used here */
117     ucs_status_t status;
118 
119     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
120     UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_OPS);
121 
122     status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
123                                                   &compare, &swap_mask, &swap, &ext);
124     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
125         return status;
126     }
127 
128     UCT_RC_IFACE_GET_TX_ATOMIC_DESC(&iface->super.super, &iface->super.tx.atomic_desc_mp, desc);
129     uct_dc_mlx5_iface_atomic_post(iface, ep, op, desc, size, remote_addr, rkey,
130                                   compare_mask, compare, swap_mask, swap);
131     return UCS_OK;
132 }
133 
134 static UCS_F_ALWAYS_INLINE ucs_status_t
uct_dc_mlx5_ep_atomic_fop(uct_dc_mlx5_ep_t * ep,int opcode,void * result,int ext,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uint64_t compare_mask,uint64_t compare,uint64_t swap_mask,uint64_t swap_add,uct_completion_t * comp)135 uct_dc_mlx5_ep_atomic_fop(uct_dc_mlx5_ep_t *ep, int opcode, void *result, int ext,
136                           unsigned length, uint64_t remote_addr, uct_rkey_t rkey,
137                           uint64_t compare_mask, uint64_t compare,
138                           uint64_t swap_mask, uint64_t swap_add, uct_completion_t *comp)
139 {
140     uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
141     uct_rc_iface_send_desc_t *desc;
142 
143     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
144     UCT_RC_IFACE_GET_TX_ATOMIC_FETCH_DESC(&iface->super.super, &iface->super.tx.atomic_desc_mp,
145                                           desc, uct_rc_iface_atomic_handler(&iface->super.super,
146                                                                             ext, length),
147                                           result, comp);
148     uct_dc_mlx5_iface_atomic_post(iface, ep, opcode, desc, length, remote_addr, rkey,
149                                   compare_mask, compare, swap_mask, swap_add);
150     return UCS_INPROGRESS;
151 }
152 
153 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep,unsigned opcode,unsigned size,uint64_t value,void * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)154 uct_dc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
155                                uint64_t value, void *result,
156                                uint64_t remote_addr, uct_rkey_t rkey,
157                                uct_completion_t *comp)
158 {
159     uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
160     int op;
161     uint64_t compare_mask;
162     uint64_t compare;
163     uint64_t swap_mask;
164     uint64_t swap;
165     int      ext;
166     ucs_status_t status;
167 
168     UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_FOPS);
169 
170     status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
171                                                   &compare, &swap_mask, &swap, &ext);
172     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
173         return status;
174     }
175 
176     return uct_dc_mlx5_ep_atomic_fop(ep, op, result, ext, size, remote_addr, rkey,
177                                      compare_mask, compare, swap_mask, swap, comp);
178 }
179 
uct_dc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep,uint64_t compare,uint64_t swap,uint64_t remote_addr,uct_rkey_t rkey,uint64_t * result,uct_completion_t * comp)180 ucs_status_t uct_dc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep, uint64_t compare, uint64_t swap,
181                                            uint64_t remote_addr, uct_rkey_t rkey,
182                                            uint64_t *result, uct_completion_t *comp)
183 {
184     return uct_dc_mlx5_ep_atomic_fop(ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t),
185                                      MLX5_OPCODE_ATOMIC_CS, result, 0, sizeof(uint64_t),
186                                      remote_addr, rkey, 0, htobe64(compare), UINT64_MAX,
187                                      htobe64(swap), comp);
188 }
189 
uct_dc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep,uint32_t compare,uint32_t swap,uint64_t remote_addr,uct_rkey_t rkey,uint32_t * result,uct_completion_t * comp)190 ucs_status_t uct_dc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep, uint32_t compare, uint32_t swap,
191                                            uint64_t remote_addr, uct_rkey_t rkey,
192                                            uint32_t *result, uct_completion_t *comp)
193 {
194     return uct_dc_mlx5_ep_atomic_fop(ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t),
195                                      MLX5_OPCODE_ATOMIC_MASKED_CS, result, 1,
196                                      sizeof(uint32_t), remote_addr, rkey, UCS_MASK(32),
197                                      htonl(compare), UINT64_MAX, htonl(swap), comp);
198 }
199 
uct_dc_mlx5_ep_atomic32_post(uct_ep_h ep,unsigned opcode,uint32_t value,uint64_t remote_addr,uct_rkey_t rkey)200 ucs_status_t uct_dc_mlx5_ep_atomic32_post(uct_ep_h ep, unsigned opcode, uint32_t value,
201                                           uint64_t remote_addr, uct_rkey_t rkey)
202 {
203     return uct_dc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
204 }
205 
uct_dc_mlx5_ep_atomic64_post(uct_ep_h ep,unsigned opcode,uint64_t value,uint64_t remote_addr,uct_rkey_t rkey)206 ucs_status_t uct_dc_mlx5_ep_atomic64_post(uct_ep_h ep, unsigned opcode, uint64_t value,
207                                           uint64_t remote_addr, uct_rkey_t rkey)
208 {
209     return uct_dc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
210 }
211 
uct_dc_mlx5_ep_atomic64_fetch(uct_ep_h ep,uct_atomic_op_t opcode,uint64_t value,uint64_t * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)212 ucs_status_t uct_dc_mlx5_ep_atomic64_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
213                                            uint64_t value, uint64_t *result,
214                                            uint64_t remote_addr, uct_rkey_t rkey,
215                                            uct_completion_t *comp)
216 {
217     return uct_dc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
218                                           remote_addr, rkey, comp);
219 }
220 
uct_dc_mlx5_ep_atomic32_fetch(uct_ep_h ep,uct_atomic_op_t opcode,uint32_t value,uint32_t * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)221 ucs_status_t uct_dc_mlx5_ep_atomic32_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
222                                            uint32_t value, uint32_t *result,
223                                            uint64_t remote_addr, uct_rkey_t rkey,
224                                            uct_completion_t *comp)
225 {
226     return uct_dc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
227                                           remote_addr, rkey, comp);
228 }
229 
uct_dc_mlx5_ep_fence(uct_ep_h tl_ep,unsigned flags)230 ucs_status_t uct_dc_mlx5_ep_fence(uct_ep_h tl_ep, unsigned flags)
231 {
232     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
233     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
234 
235     return uct_rc_ep_fence(tl_ep, &iface->tx.dcis[ep->dci].txwq.fi,
236                            ep->dci != UCT_DC_MLX5_EP_NO_DCI);
237 }
238 
239 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_am_short_inline(uct_ep_h tl_ep,uint8_t id,uint64_t hdr,const void * buffer,unsigned length)240 uct_dc_mlx5_ep_am_short_inline(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
241                                const void *buffer, unsigned length)
242 {
243     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
244     uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
245     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
246 
247     UCT_RC_MLX5_CHECK_AM_SHORT(id, length, UCT_IB_MLX5_AV_FULL_SIZE);
248     UCT_DC_CHECK_RES_AND_FC(iface, ep);
249 
250     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
251 
252     uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
253                                  txqp, txwq,
254                                  MLX5_OPCODE_SEND,
255                                  buffer, length, id, hdr, 0,
256                                  0, 0,
257                                  &ep->av, uct_dc_mlx5_ep_get_grh(ep),
258                                  uct_ib_mlx5_wqe_av_size(&ep->av),
259                                  MLX5_WQE_CTRL_SOLICITED, INT_MAX);
260 
261     UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
262     UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, sizeof(hdr) + length);
263     return UCS_OK;
264 }
265 
266 #if HAVE_IBV_DM
267 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_short_dm(uct_dc_mlx5_ep_t * ep,uct_rc_mlx5_dm_copy_data_t * cache,size_t hdr_len,const void * payload,unsigned length,unsigned opcode,uint8_t fm_ce_se,uint64_t rdma_raddr,uct_rkey_t rdma_rkey)268 uct_dc_mlx5_ep_short_dm(uct_dc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache,
269                         size_t hdr_len, const void *payload, unsigned length,
270                         unsigned opcode, uint8_t fm_ce_se,
271                         uint64_t rdma_raddr, uct_rkey_t rdma_rkey)
272 {
273     uct_dc_mlx5_iface_t *iface     = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
274     uct_rc_iface_send_desc_t *desc = NULL;
275     void *buffer;
276     ucs_status_t status;
277     uct_ib_log_sge_t log_sge;
278 
279     status = uct_rc_mlx5_common_dm_make_data(&iface->super, cache, hdr_len,
280                                              payload, length, &desc,
281                                              &buffer, &log_sge);
282     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
283         return status;
284     }
285 
286     uct_dc_mlx5_iface_bcopy_post(iface, ep, opcode,
287                                  hdr_len + length,
288                                  rdma_raddr, rdma_rkey,
289                                  desc, fm_ce_se, 0, buffer,
290                                  log_sge.num_sge ? &log_sge : NULL);
291     return UCS_OK;
292 }
293 #endif
294 
uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep,uint8_t id,uint64_t hdr,const void * buffer,unsigned length)295 ucs_status_t uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
296                                      const void *buffer, unsigned length)
297 {
298 #if HAVE_IBV_DM
299     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
300     uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
301     ucs_status_t status;
302     uct_rc_mlx5_dm_copy_data_t cache;
303 
304     if (ucs_likely((sizeof(uct_rc_mlx5_am_short_hdr_t) + length <=
305                     UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) ||
306                    !iface->super.dm.dm)) {
307 #endif
308         return uct_dc_mlx5_ep_am_short_inline(tl_ep, id, hdr, buffer, length);
309 #if HAVE_IBV_DM
310     }
311 
312     UCT_CHECK_AM_ID(id);
313     UCT_CHECK_LENGTH(length + sizeof(uct_rc_mlx5_am_short_hdr_t), 0,
314                      iface->super.dm.seg_len, "am_short");
315     UCT_DC_CHECK_RES_AND_FC(iface, ep);
316 
317     uct_rc_mlx5_am_hdr_fill(&cache.am_hdr.rc_hdr, id);
318     cache.am_hdr.am_hdr = hdr;
319 
320     status = uct_dc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.am_hdr), buffer, length,
321                                      MLX5_OPCODE_SEND,
322                                      MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
323                                      0, 0);
324     if (UCS_STATUS_IS_ERR(status)) {
325         return status;
326     }
327     UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, sizeof(cache.am_hdr) + length);
328     UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
329     return UCS_OK;
330 #endif
331 }
332 
uct_dc_mlx5_ep_am_bcopy(uct_ep_h tl_ep,uint8_t id,uct_pack_callback_t pack_cb,void * arg,unsigned flags)333 ssize_t uct_dc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id,
334                                 uct_pack_callback_t pack_cb, void *arg,
335                                 unsigned flags)
336 {
337     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
338     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
339     uct_rc_iface_send_desc_t *desc;
340     size_t length;
341 
342     UCT_CHECK_AM_ID(id);
343     UCT_DC_CHECK_RES_AND_FC(iface, ep);
344     UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(&iface->super.super, &iface->super.super.tx.mp, desc,
345                                       id, uct_rc_mlx5_am_hdr_fill, uct_rc_mlx5_hdr_t,
346                                       pack_cb, arg, &length);
347 
348     uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_SEND,
349                                  sizeof(uct_rc_mlx5_hdr_t) + length, 0, 0, desc,
350                                  MLX5_WQE_CTRL_SOLICITED, 0, desc + 1, NULL);
351 
352     UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
353     UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, length);
354     return length;
355 }
356 
uct_dc_mlx5_ep_am_zcopy(uct_ep_h tl_ep,uint8_t id,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)357 ucs_status_t uct_dc_mlx5_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *header,
358                                      unsigned header_length, const uct_iov_t *iov,
359                                      size_t iovcnt, unsigned flags,
360                                      uct_completion_t *comp)
361 {
362     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
363     uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
364 
365     UCT_CHECK_IOV_SIZE(iovcnt, UCT_IB_MLX5_AM_ZCOPY_MAX_IOV,
366                        "uct_dc_mlx5_ep_am_zcopy");
367     UCT_RC_MLX5_CHECK_AM_ZCOPY(id, header_length, uct_iov_total_length(iov, iovcnt),
368                                iface->super.super.super.config.seg_size,
369                                UCT_IB_MLX5_AV_FULL_SIZE);
370     UCT_DC_CHECK_RES_AND_FC(iface, ep);
371 
372     uct_dc_mlx5_iface_zcopy_post(iface, ep, MLX5_OPCODE_SEND, iov, iovcnt, 0ul,
373                                  id, header, header_length, 0, 0, 0ul, 0, 0,
374                                  uct_rc_ep_send_op_completion_handler,
375                                  comp, MLX5_WQE_CTRL_SOLICITED);
376 
377     UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
378     UCT_TL_EP_STAT_OP(&ep->super, AM, ZCOPY, header_length +
379                       uct_iov_total_length(iov, iovcnt));
380 
381     return UCS_INPROGRESS;
382 }
383 
384 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_put_short_inline(uct_ep_h tl_ep,const void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey)385 uct_dc_mlx5_ep_put_short_inline(uct_ep_h tl_ep, const void *buffer,
386                                 unsigned length, uint64_t remote_addr,
387                                 uct_rkey_t rkey)
388 {
389     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
390     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
391     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
392 
393     UCT_RC_MLX5_CHECK_PUT_SHORT(length, UCT_IB_MLX5_AV_FULL_SIZE);
394     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
395 
396     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
397     uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
398                              ep->atomic_mr_offset);
399     uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
400                                  txqp, txwq, MLX5_OPCODE_RDMA_WRITE,
401                                  buffer, length, 0, 0, 0, remote_addr, rkey,
402                                  &ep->av, uct_dc_mlx5_ep_get_grh(ep),
403                                  uct_ib_mlx5_wqe_av_size(&ep->av), 0, INT_MAX);
404 
405     UCT_TL_EP_STAT_OP(&ep->super, PUT, SHORT, length);
406 
407     return UCS_OK;
408 }
409 
uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep,const void * payload,unsigned length,uint64_t remote_addr,uct_rkey_t rkey)410 ucs_status_t uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *payload,
411                                       unsigned length, uint64_t remote_addr,
412                                       uct_rkey_t rkey)
413 {
414 #if HAVE_IBV_DM
415     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
416     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
417     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
418     ucs_status_t status;
419 
420     if (ucs_likely((length <= UCT_IB_MLX5_PUT_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) ||
421                    !iface->super.dm.dm)) {
422 #endif
423         return uct_dc_mlx5_ep_put_short_inline(tl_ep, payload, length, remote_addr, rkey);
424 #if HAVE_IBV_DM
425     }
426 
427     UCT_CHECK_LENGTH(length, 0, iface->super.dm.seg_len, "put_short");
428     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
429     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
430     uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
431                              ep->atomic_mr_offset);
432     status = uct_dc_mlx5_ep_short_dm(ep, NULL, 0, payload, length,
433                                      MLX5_OPCODE_RDMA_WRITE,
434                                      MLX5_WQE_CTRL_CQ_UPDATE,
435                                      remote_addr, rkey);
436     if (UCS_STATUS_IS_ERR(status)) {
437         return status;
438     }
439     UCT_TL_EP_STAT_OP(&ep->super, PUT, SHORT, length);
440     return UCS_OK;
441 #endif
442 }
443 
uct_dc_mlx5_ep_put_bcopy(uct_ep_h tl_ep,uct_pack_callback_t pack_cb,void * arg,uint64_t remote_addr,uct_rkey_t rkey)444 ssize_t uct_dc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb,
445                                  void *arg, uint64_t remote_addr, uct_rkey_t rkey)
446 {
447     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
448     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
449     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
450     uct_rc_iface_send_desc_t *desc;
451     size_t length;
452 
453     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
454     UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(&iface->super.super, &iface->super.super.tx.mp,
455                                        desc, pack_cb, arg, length);
456     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
457     uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
458                              ep->atomic_mr_offset);
459     uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_RDMA_WRITE, length,
460                                  remote_addr, rkey, desc, 0, 0, desc + 1, NULL);
461     UCT_TL_EP_STAT_OP(&ep->super, PUT, BCOPY, length);
462     return length;
463 }
464 
uct_dc_mlx5_ep_put_zcopy(uct_ep_h tl_ep,const uct_iov_t * iov,size_t iovcnt,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)465 ucs_status_t uct_dc_mlx5_ep_put_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov, size_t iovcnt,
466                                       uint64_t remote_addr, uct_rkey_t rkey,
467                                       uct_completion_t *comp)
468 {
469     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
470     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
471     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
472 
473     UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_RMA_MAX_IOV(UCT_IB_MLX5_AV_FULL_SIZE),
474                        "uct_dc_mlx5_ep_put_zcopy");
475     UCT_CHECK_LENGTH(uct_iov_total_length(iov, iovcnt), 0, UCT_IB_MAX_MESSAGE_SIZE,
476                      "put_zcopy");
477     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
478     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
479     uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
480                              ep->atomic_mr_offset);
481 
482     uct_dc_mlx5_iface_zcopy_post(iface, ep, MLX5_OPCODE_RDMA_WRITE, iov, iovcnt,
483                                  0ul, 0, NULL, 0, remote_addr, rkey, 0ul, 0, 0,
484                                  uct_rc_ep_send_op_completion_handler, comp, 0);
485 
486     UCT_TL_EP_STAT_OP(&ep->super, PUT, ZCOPY,
487                       uct_iov_total_length(iov, iovcnt));
488     return UCS_INPROGRESS;
489 }
490 
uct_dc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,uct_unpack_callback_t unpack_cb,void * arg,size_t length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)491 ucs_status_t uct_dc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,
492                                       uct_unpack_callback_t unpack_cb,
493                                       void *arg, size_t length,
494                                       uint64_t remote_addr, uct_rkey_t rkey,
495                                       uct_completion_t *comp)
496 {
497     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
498     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
499     uint8_t fm_ce_se           = 0;
500     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
501     uct_rc_iface_send_desc_t *desc;
502 
503     UCT_CHECK_LENGTH(length, 0, iface->super.super.super.config.seg_size,
504                      "get_bcopy");
505     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
506     UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(&iface->super.super,
507                                        &iface->super.super.tx.mp,
508                                        desc, unpack_cb, comp, arg, length);
509     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
510     uct_rc_mlx5_ep_fence_get(&iface->super, txwq, &rkey, &fm_ce_se);
511 
512     uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_RDMA_READ, length,
513                                  remote_addr, rkey, desc, fm_ce_se, 0,
514                                  desc + 1, NULL);
515 
516     UCT_RC_RDMA_READ_POSTED(&iface->super.super, length);
517     UCT_TL_EP_STAT_OP(&ep->super, GET, BCOPY, length);
518 
519     return UCS_INPROGRESS;
520 }
521 
522 
uct_dc_mlx5_ep_get_zcopy(uct_ep_h tl_ep,const uct_iov_t * iov,size_t iovcnt,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)523 ucs_status_t uct_dc_mlx5_ep_get_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov,
524                                       size_t iovcnt, uint64_t remote_addr,
525                                       uct_rkey_t rkey, uct_completion_t *comp)
526 {
527     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
528     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
529     uint8_t fm_ce_se           = 0;
530     size_t total_length        = uct_iov_total_length(iov, iovcnt);
531     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
532 
533     UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_RMA_MAX_IOV(UCT_IB_MLX5_AV_FULL_SIZE),
534                        "uct_dc_mlx5_ep_get_zcopy");
535     UCT_CHECK_LENGTH(total_length,
536                      iface->super.super.super.config.max_inl_cqe[UCT_IB_DIR_TX] + 1,
537                      iface->super.super.config.max_get_zcopy, "get_zcopy");
538     UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
539     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
540     uct_rc_mlx5_ep_fence_get(&iface->super, txwq, &rkey, &fm_ce_se);
541 
542     uct_dc_mlx5_iface_zcopy_post(iface, ep, MLX5_OPCODE_RDMA_READ, iov, iovcnt,
543                                  total_length, 0, NULL, 0, remote_addr, rkey,
544                                  0ul, 0, 0,
545                                  uct_rc_ep_get_zcopy_completion_handler, comp,
546                                  fm_ce_se);
547 
548     UCT_RC_RDMA_READ_POSTED(&iface->super.super, total_length);
549     UCT_TL_EP_STAT_OP(&ep->super, GET, ZCOPY, total_length);
550 
551     return UCS_INPROGRESS;
552 }
553 
uct_dc_mlx5_ep_flush(uct_ep_h tl_ep,unsigned flags,uct_completion_t * comp)554 ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
555                                   uct_completion_t *comp)
556 {
557     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
558     uct_dc_mlx5_ep_t    *ep    = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
559     ucs_status_t        status;
560     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
561 
562     if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
563         if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
564             return UCS_ERR_UNSUPPORTED;
565         }
566 
567         uct_ep_pending_purge(tl_ep, NULL, 0);
568         if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
569             /* No dci -> no WQEs -> HW is clean, nothing to cancel */
570             return UCS_OK;
571         }
572 
573         uct_dc_mlx5_ep_handle_failure(ep, NULL, UCS_ERR_CANCELED);
574         return UCS_OK;
575     }
576 
577     if (!uct_dc_mlx5_iface_has_tx_resources(iface)) {
578         return UCS_ERR_NO_RESOURCE;
579     }
580 
581     if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
582         if (!uct_dc_mlx5_iface_dci_can_alloc(iface)) {
583             return UCS_ERR_NO_RESOURCE; /* waiting for dci */
584         } else {
585             UCT_TL_EP_STAT_FLUSH(&ep->super); /* no sends */
586             return UCS_OK;
587         }
588     }
589 
590     if (!uct_dc_mlx5_iface_dci_ep_can_send(ep)) {
591         return UCS_ERR_NO_RESOURCE; /* cannot send */
592     }
593 
594     status = uct_dc_mlx5_iface_flush_dci(iface, ep->dci);
595     if (status == UCS_OK) {
596         UCT_TL_EP_STAT_FLUSH(&ep->super);
597         return UCS_OK; /* all sends completed */
598     }
599 
600     ucs_assert(status == UCS_INPROGRESS);
601     ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
602 
603     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
604 
605     return uct_rc_txqp_add_flush_comp(&iface->super.super, &ep->super, txqp,
606                                       comp, txwq->sig_pi);
607 }
608 
609 #if IBV_HW_TM
610 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep,uct_tag_t tag,const void * data,size_t length)611 uct_dc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep, uct_tag_t tag,
612                                       const void *data, size_t length)
613 {
614     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
615     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
616     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
617 
618     UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
619                      UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE),
620                      "uct_dc_mlx5_ep_tag_short");
621     UCT_DC_MLX5_CHECK_RES(iface, ep);
622 
623     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
624 
625     uct_rc_mlx5_txqp_tag_inline_post(&iface->super, UCT_IB_QPT_DCI,
626                                      txqp, txwq, MLX5_OPCODE_SEND, data, length,
627                                      NULL, tag, 0, IBV_TMH_EAGER, 0,
628                                      &ep->av, uct_dc_mlx5_ep_get_grh(ep),
629                                      uct_ib_mlx5_wqe_av_size(&ep->av), NULL, 0,
630                                      MLX5_WQE_CTRL_SOLICITED);
631 
632     UCT_TL_EP_STAT_OP(&ep->super, TAG, SHORT, length);
633 
634     return UCS_OK;
635 }
636 
uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep,uct_tag_t tag,const void * data,size_t length)637 ucs_status_t uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag,
638                                             const void *data, size_t length)
639 {
640 #if HAVE_IBV_DM
641     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
642     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
643     uct_rc_mlx5_dm_copy_data_t cache;
644     ucs_status_t status;
645 
646     if (ucs_likely((sizeof(struct ibv_tmh) + length <=
647                     UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) ||
648                    !iface->super.dm.dm)) {
649 #endif
650         return uct_dc_mlx5_ep_tag_eager_short_inline(tl_ep, tag, data, length);
651 #if HAVE_IBV_DM
652     }
653 
654     UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
655                      iface->super.dm.seg_len, "tag_short");
656     UCT_DC_MLX5_CHECK_RES(iface, ep);
657 
658     uct_rc_mlx5_fill_tmh(ucs_unaligned_ptr(&cache.tm_hdr), tag, 0, IBV_TMH_EAGER);
659 
660     status = uct_dc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.tm_hdr), data,
661                                      length, MLX5_OPCODE_SEND,
662                                      MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
663                                      0, 0);
664     if (!UCS_STATUS_IS_ERR(status)) {
665         UCT_TL_EP_STAT_OP(&ep->super, TAG, SHORT, length);
666     }
667 
668     return status;
669 #endif
670 }
671 
uct_dc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep,uct_tag_t tag,uint64_t imm,uct_pack_callback_t pack_cb,void * arg,unsigned flags)672 ssize_t uct_dc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag,
673                                        uint64_t imm,
674                                        uct_pack_callback_t pack_cb,
675                                        void *arg, unsigned flags)
676 {
677     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
678     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
679     uct_rc_iface_send_desc_t *desc;
680     uint32_t app_ctx, ib_imm;
681     int opcode;
682     size_t length;
683 
684     UCT_DC_MLX5_CHECK_RES(iface, ep);
685 
686     UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND,
687                             _IMM);
688 
689     UCT_RC_MLX5_IFACE_GET_TM_BCOPY_DESC(&iface->super.super,
690                                         iface->super.tm.bcopy_mp,
691                                         desc, tag, app_ctx, pack_cb,
692                                         arg, length);
693 
694     uct_dc_mlx5_iface_bcopy_post(iface, ep, opcode,
695                                  sizeof(struct ibv_tmh) + length,
696                                  0, 0, desc, MLX5_WQE_CTRL_SOLICITED, ib_imm,
697                                  desc + 1, NULL);
698 
699     UCT_TL_EP_STAT_OP(&ep->super, TAG, BCOPY, length);
700 
701     return length;
702 }
703 
uct_dc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep,uct_tag_t tag,uint64_t imm,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)704 ucs_status_t uct_dc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
705                                             uint64_t imm, const uct_iov_t *iov,
706                                             size_t iovcnt, unsigned flags,
707                                             uct_completion_t *comp)
708 {
709     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
710     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
711     uint32_t app_ctx, ib_imm;
712     int opcode;
713 
714     UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_TM_EAGER_ZCOPY_MAX_IOV(UCT_IB_MLX5_AV_FULL_SIZE),
715                        "uct_dc_mlx5_ep_tag_eager_zcopy");
716     UCT_RC_CHECK_ZCOPY_DATA(sizeof(struct ibv_tmh),
717                             uct_iov_total_length(iov, iovcnt),
718                             iface->super.tm.max_zcopy);
719 
720     UCT_DC_MLX5_CHECK_RES(iface, ep);
721 
722     UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND, _IMM);
723 
724     uct_dc_mlx5_iface_zcopy_post(iface, ep, opcode|UCT_RC_MLX5_OPCODE_FLAG_TM,
725                                  iov, iovcnt, 0ul, 0, "", 0, 0, 0, tag, app_ctx,
726                                  ib_imm, uct_rc_ep_send_op_completion_handler,
727                                  comp, MLX5_WQE_CTRL_SOLICITED);
728 
729     UCT_TL_EP_STAT_OP(&ep->super, TAG, ZCOPY,
730                       uct_iov_total_length(iov, iovcnt));
731 
732     return UCS_INPROGRESS;
733 }
734 
uct_dc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep,uct_tag_t tag,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)735 ucs_status_ptr_t uct_dc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
736                                                const void *header,
737                                                unsigned header_length,
738                                                const uct_iov_t *iov,
739                                                size_t iovcnt, unsigned flags,
740                                                uct_completion_t *comp)
741 {
742     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
743     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
744     unsigned tm_hdr_len        = sizeof(struct ibv_tmh) +
745                                  sizeof(struct ibv_rvh) +
746                                  sizeof(struct ibv_ravh);
747     struct ibv_ravh ravh;
748     uint32_t op_index;
749     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
750 
751     UCT_RC_MLX5_CHECK_RNDV_PARAMS(iovcnt, header_length, tm_hdr_len,
752                                    UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE),
753                                    iface->super.tm.max_rndv_data +
754                                    UCT_RC_MLX5_TMH_PRIV_LEN);
755     UCT_DC_CHECK_RES_PTR(iface, ep);
756 
757     op_index = uct_rc_mlx5_tag_get_op_id(&iface->super, comp);
758 
759     uct_dc_mlx5_iface_fill_ravh(&ravh, iface->rx.dct.qp_num);
760 
761     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
762 
763     uct_rc_mlx5_txqp_tag_inline_post(&iface->super, UCT_IB_QPT_DCI,
764                                      txqp, txwq, MLX5_OPCODE_SEND, header,
765                                      header_length, iov, tag, op_index,
766                                      IBV_TMH_RNDV, 0, &ep->av,
767                                      uct_dc_mlx5_ep_get_grh(ep),
768                                      uct_ib_mlx5_wqe_av_size(&ep->av), &ravh,
769                                      sizeof(ravh), MLX5_WQE_CTRL_SOLICITED);
770 
771     return (ucs_status_ptr_t)((uint64_t)op_index);
772 }
773 
uct_dc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep,uct_tag_t tag,const void * header,unsigned header_length,unsigned flags)774 ucs_status_t uct_dc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep, uct_tag_t tag,
775                                              const void* header,
776                                              unsigned header_length,
777                                              unsigned flags)
778 {
779     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
780     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
781     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
782 
783     UCT_CHECK_LENGTH(header_length + sizeof(struct ibv_tmh), 0,
784                      UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE),
785                      "tag_rndv_request");
786     UCT_DC_MLX5_CHECK_RES(iface, ep);
787 
788     UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
789 
790     uct_rc_mlx5_txqp_tag_inline_post(&iface->super, UCT_IB_QPT_DCI,
791                                      txqp, txwq, MLX5_OPCODE_SEND_IMM, header,
792                                      header_length, NULL, tag, 0,
793                                      IBV_TMH_EAGER, 0, &ep->av,
794                                      uct_dc_mlx5_ep_get_grh(ep),
795                                      uct_ib_mlx5_wqe_av_size(&ep->av), NULL, 0,
796                                      MLX5_WQE_CTRL_SOLICITED);
797     return UCS_OK;
798 }
799 
uct_dc_mlx5_iface_tag_recv_zcopy(uct_iface_h tl_iface,uct_tag_t tag,uct_tag_t tag_mask,const uct_iov_t * iov,size_t iovcnt,uct_tag_context_t * ctx)800 ucs_status_t uct_dc_mlx5_iface_tag_recv_zcopy(uct_iface_h tl_iface,
801                                               uct_tag_t tag,
802                                               uct_tag_t tag_mask,
803                                               const uct_iov_t *iov,
804                                               size_t iovcnt,
805                                               uct_tag_context_t *ctx)
806 {
807     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
808 
809     return uct_rc_mlx5_iface_common_tag_recv(&iface->super, tag, tag_mask,
810                                              iov, iovcnt, ctx);
811 }
812 
uct_dc_mlx5_iface_tag_recv_cancel(uct_iface_h tl_iface,uct_tag_context_t * ctx,int force)813 ucs_status_t uct_dc_mlx5_iface_tag_recv_cancel(uct_iface_h tl_iface,
814                                                uct_tag_context_t *ctx,
815                                                int force)
816 {
817    uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
818 
819    return uct_rc_mlx5_iface_common_tag_recv_cancel(&iface->super, ctx, force);
820 }
821 #endif
822 
uct_dc_mlx5_ep_fc_ctrl(uct_ep_t * tl_ep,unsigned op,uct_rc_fc_request_t * req)823 ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
824                                     uct_rc_fc_request_t *req)
825 {
826     uct_dc_mlx5_ep_t *dc_ep    = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
827     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface,
828                                                 uct_dc_mlx5_iface_t);
829     uct_ib_iface_t *ib_iface   = &iface->super.super.super;
830     struct ibv_ah_attr ah_attr = {.is_global = 0};
831     uct_dc_fc_sender_data_t sender;
832     uct_dc_fc_request_t *dc_req;
833     struct mlx5_wqe_av mlx5_av;
834     uct_ib_mlx5_base_av_t av;
835     ucs_status_t status;
836     uintptr_t sender_ep;
837     struct ibv_ah *ah;
838 
839     UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
840 
841     ucs_assert((sizeof(uint8_t) + sizeof(sender_ep)) <=
842                 UCT_IB_MLX5_AV_FULL_SIZE);
843 
844     UCT_DC_MLX5_CHECK_RES(iface, dc_ep);
845     UCT_DC_MLX5_IFACE_TXQP_GET(iface, dc_ep, txqp, txwq);
846 
847     dc_req = ucs_derived_of(req, uct_dc_fc_request_t);
848 
849     if (op == UCT_RC_EP_FC_PURE_GRANT) {
850         ucs_assert(req != NULL);
851 
852         sender_ep = (uintptr_t)dc_req->sender.ep;
853 
854         /* TODO: look at common code with uct_ud_mlx5_iface_get_av */
855         if (dc_req->sender.global.is_global) {
856             uct_ib_iface_fill_ah_attr_from_gid_lid(ib_iface, dc_req->lid,
857                                                    ucs_unaligned_ptr(&dc_req->sender.global.gid),
858                                                    iface->super.super.super.gid_info.gid_index,
859                                                    0, &ah_attr);
860 
861             status = uct_ib_iface_create_ah(ib_iface, &ah_attr, &ah);
862             if (status != UCS_OK) {
863                 return status;
864             }
865 
866             uct_ib_mlx5_get_av(ah, &mlx5_av);
867         }
868 
869         /* Note av initialization is copied from exp verbs */
870         av.stat_rate_sl = ib_iface->config.sl; /* (attr->static_rate << 4) | attr->sl */
871         av.fl_mlid      = ib_iface->path_bits[0] & 0x7f;
872 
873         /* lid in dc_req is in BE already  */
874         if (uct_ib_iface_is_roce(ib_iface)) {
875             av.rlid     = htons(UCT_IB_ROCE_UDP_SRC_PORT_BASE);
876         } else {
877             av.rlid     = dc_req->lid | htons(ib_iface->path_bits[0]);
878         }
879         av.dqp_dct      = htonl(dc_req->dct_num);
880 
881         if (!iface->ud_common.config.compact_av || ah_attr.is_global) {
882             av.dqp_dct |= UCT_IB_MLX5_EXTENDED_UD_AV;
883         }
884 
885         uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
886                                      txqp, txwq, MLX5_OPCODE_SEND,
887                                      &av /*dummy*/, 0, op, sender_ep, 0,
888                                      0, 0,
889                                      &av, ah_attr.is_global ? mlx5_av_grh(&mlx5_av) : NULL,
890                                      uct_ib_mlx5_wqe_av_size(&av), 0, INT_MAX);
891     } else {
892         ucs_assert(op == UCT_RC_EP_FC_FLAG_HARD_REQ);
893         sender.ep               = (uint64_t)dc_ep;
894         sender.global.gid       = ib_iface->gid_info.gid;
895         sender.global.is_global = dc_ep->flags & UCT_DC_MLX5_EP_FLAG_GRH;
896 
897         UCS_STATS_UPDATE_COUNTER(dc_ep->fc.stats,
898                                  UCT_RC_FC_STAT_TX_HARD_REQ, 1);
899 
900         uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
901                                      txqp, txwq, MLX5_OPCODE_SEND_IMM,
902                                      &sender.global, sizeof(sender.global), op, sender.ep,
903                                      iface->rx.dct.qp_num,
904                                      0, 0,
905                                      &dc_ep->av,
906                                      uct_dc_mlx5_ep_get_grh(dc_ep),
907                                      uct_ib_mlx5_wqe_av_size(&dc_ep->av),
908                                      MLX5_WQE_CTRL_SOLICITED, INT_MAX);
909     }
910 
911     return UCS_OK;
912 }
913 
914 
UCS_CLASS_INIT_FUNC(uct_dc_mlx5_ep_t,uct_dc_mlx5_iface_t * iface,const uct_dc_mlx5_iface_addr_t * if_addr,uct_ib_mlx5_base_av_t * av)915 UCS_CLASS_INIT_FUNC(uct_dc_mlx5_ep_t, uct_dc_mlx5_iface_t *iface,
916                     const uct_dc_mlx5_iface_addr_t *if_addr,
917                     uct_ib_mlx5_base_av_t *av)
918 {
919     uint32_t remote_dctn;
920 
921     ucs_trace_func("");
922 
923     UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super.super.super);
924 
925     self->atomic_mr_offset = uct_ib_md_atomic_offset(if_addr->atomic_mr_id);
926     remote_dctn            = uct_ib_unpack_uint24(if_addr->qp_num);
927 
928     memcpy(&self->av, av, sizeof(*av));
929     self->av.dqp_dct |= htonl(remote_dctn);
930 
931     return uct_dc_mlx5_ep_basic_init(iface, self);
932 }
933 
UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)934 static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)
935 {
936     uct_dc_mlx5_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_dc_mlx5_iface_t);
937 
938     uct_dc_mlx5_ep_pending_purge(&self->super.super, NULL, NULL);
939     uct_rc_fc_cleanup(&self->fc);
940 
941     ucs_assert_always(self->flags & UCT_DC_MLX5_EP_FLAG_VALID);
942 
943     if ((self->dci == UCT_DC_MLX5_EP_NO_DCI) ||
944         uct_dc_mlx5_iface_is_dci_rand(iface)) {
945         return;
946     }
947 
948     /* TODO: this is good for dcs policy only.
949      * Need to change if eps share dci
950      */
951     ucs_arbiter_group_cleanup(uct_dc_mlx5_ep_arb_group(iface, self));
952     ucs_assertv_always(uct_dc_mlx5_iface_dci_has_outstanding(iface, self->dci),
953                        "iface (%p) ep (%p) dci leak detected: dci=%d", iface,
954                        self, self->dci);
955 
956     /* we can handle it but well behaving app should not do this */
957     ucs_debug("ep (%p) is destroyed with %d outstanding ops",
958               self, (int16_t)iface->super.super.config.tx_qp_len -
959               uct_rc_txqp_available(&iface->tx.dcis[self->dci].txqp));
960     uct_rc_txqp_purge_outstanding(&iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED, 1);
961     iface->tx.dcis[self->dci].ep = NULL;
962 }
963 
964 UCS_CLASS_DEFINE(uct_dc_mlx5_ep_t, uct_base_ep_t);
965 UCS_CLASS_DEFINE_NEW_FUNC(uct_dc_mlx5_ep_t, uct_ep_t, uct_dc_mlx5_iface_t *,
966                           const uct_dc_mlx5_iface_addr_t *,
967                           uct_ib_mlx5_base_av_t *);
968 
UCS_CLASS_INIT_FUNC(uct_dc_mlx5_grh_ep_t,uct_dc_mlx5_iface_t * iface,const uct_dc_mlx5_iface_addr_t * if_addr,uct_ib_mlx5_base_av_t * av,struct mlx5_grh_av * grh_av)969 UCS_CLASS_INIT_FUNC(uct_dc_mlx5_grh_ep_t, uct_dc_mlx5_iface_t *iface,
970                     const uct_dc_mlx5_iface_addr_t *if_addr,
971                     uct_ib_mlx5_base_av_t *av,
972                     struct mlx5_grh_av *grh_av)
973 {
974     ucs_trace_func("");
975 
976     UCS_CLASS_CALL_SUPER_INIT(uct_dc_mlx5_ep_t, iface, if_addr, av);
977 
978     self->super.flags |= UCT_DC_MLX5_EP_FLAG_GRH;
979     memcpy(&self->grh_av, grh_av, sizeof(*grh_av));
980     return UCS_OK;
981 }
982 
UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_grh_ep_t)983 UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_grh_ep_t)
984 {
985     ucs_trace_func("");
986 }
987 
988 UCS_CLASS_DEFINE(uct_dc_mlx5_grh_ep_t, uct_dc_mlx5_ep_t);
989 UCS_CLASS_DEFINE_NEW_FUNC(uct_dc_mlx5_grh_ep_t, uct_ep_t, uct_dc_mlx5_iface_t *,
990                           const uct_dc_mlx5_iface_addr_t *,
991                           uct_ib_mlx5_base_av_t *, struct mlx5_grh_av *);
992 
uct_dc_mlx5_ep_cleanup(uct_ep_h tl_ep,ucs_class_t * cls)993 void uct_dc_mlx5_ep_cleanup(uct_ep_h tl_ep, ucs_class_t *cls)
994 {
995     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
996     uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
997 
998     UCS_CLASS_CLEANUP_CALL(cls, ep);
999 
1000     if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
1001         ucs_trace("not releasing dc_mlx5_ep %p - waiting for grant", ep);
1002         ep->flags &= ~UCT_DC_MLX5_EP_FLAG_VALID;
1003         /* No need to wait for grant on this ep anymore */
1004         uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
1005         ucs_list_add_tail(&iface->tx.gc_list, &ep->list);
1006     } else {
1007         ucs_free(ep);
1008     }
1009 }
1010 
uct_dc_mlx5_ep_release(uct_dc_mlx5_ep_t * ep)1011 void uct_dc_mlx5_ep_release(uct_dc_mlx5_ep_t *ep)
1012 {
1013     ucs_assert_always(!(ep->flags & UCT_DC_MLX5_EP_FLAG_VALID));
1014     ucs_debug("release dc_mlx5_ep %p", ep);
1015     ucs_list_del(&ep->list);
1016     ucs_free(ep);
1017 }
1018 
uct_dc_mlx5_ep_pending_common(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,uct_pending_req_t * r,unsigned flags,int push_to_head)1019 void uct_dc_mlx5_ep_pending_common(uct_dc_mlx5_iface_t *iface,
1020                                    uct_dc_mlx5_ep_t *ep, uct_pending_req_t *r,
1021                                    unsigned flags, int push_to_head)
1022 {
1023     int no_dci = (ep->dci == UCT_DC_MLX5_EP_NO_DCI);
1024     ucs_arbiter_group_t *group;
1025 
1026     UCS_STATIC_ASSERT(sizeof(uct_dc_mlx5_pending_req_priv) <=
1027                       UCT_PENDING_REQ_PRIV_LEN);
1028 
1029     if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
1030         uct_dc_mlx5_pending_req_priv(r)->ep = ep;
1031         group = uct_dc_mlx5_ep_rand_arb_group(iface, ep);
1032     } else {
1033         group = &ep->arb_group;
1034     }
1035 
1036     if (push_to_head) {
1037         uct_pending_req_arb_group_push_head(no_dci ?
1038                                             uct_dc_mlx5_iface_dci_waitq(iface) :
1039                                             uct_dc_mlx5_iface_tx_waitq(iface),
1040                                             group, r);
1041     } else {
1042         uct_pending_req_arb_group_push(group, r);
1043     }
1044 
1045     if (no_dci) {
1046         /* no dci:
1047          *  Do not grab dci here. Instead put the group on dci allocation arbiter.
1048          *  This way we can assure fairness between all eps waiting for
1049          *  dci allocation. Relevant for dcs and dcs_quota policies.
1050          */
1051         uct_dc_mlx5_iface_schedule_dci_alloc(iface, ep);
1052     } else {
1053         uct_dc_mlx5_iface_dci_sched_tx(iface, ep);
1054     }
1055 
1056     UCT_TL_EP_STAT_PEND(&ep->super);
1057 }
1058 
1059 
1060 /* TODO:
1061    currently pending code supports only dcs policy
1062    support hash/random policies
1063  */
uct_dc_mlx5_ep_pending_add(uct_ep_h tl_ep,uct_pending_req_t * r,unsigned flags)1064 ucs_status_t uct_dc_mlx5_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *r,
1065                                         unsigned flags)
1066 {
1067     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
1068     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
1069 
1070     /* ep can tx iff
1071      * - iface has resources: cqe and tx skb
1072      * - dci is either assigned or can be assigned
1073      * - dci has resources
1074      */
1075     if (uct_dc_mlx5_iface_has_tx_resources(iface)) {
1076         if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
1077             if (uct_dc_mlx5_iface_dci_can_alloc(iface) && (ep->fc.fc_wnd > 0)) {
1078                 return UCS_ERR_BUSY;
1079             }
1080         } else {
1081             if (uct_dc_mlx5_iface_dci_ep_can_send(ep)) {
1082                 return UCS_ERR_BUSY;
1083             }
1084         }
1085     }
1086 
1087     uct_dc_mlx5_ep_pending_common(iface, ep, r, flags, 0);
1088 
1089     return UCS_OK;
1090 }
1091 
1092 /**
1093  * dispatch requests waiting for dci allocation
1094  * Relevant for dcs and dcs_quota policies only.
1095  */
1096 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_pending_wait(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1097 uct_dc_mlx5_iface_dci_do_pending_wait(ucs_arbiter_t *arbiter,
1098                                       ucs_arbiter_group_t *group,
1099                                       ucs_arbiter_elem_t *elem,
1100                                       void *arg)
1101 {
1102     uct_dc_mlx5_ep_t *ep = ucs_container_of(group, uct_dc_mlx5_ep_t, arb_group);
1103     uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
1104 
1105     ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
1106 
1107     if (ep->dci != UCT_DC_MLX5_EP_NO_DCI) {
1108         return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
1109     }
1110 
1111     if (!uct_dc_mlx5_iface_dci_can_alloc(iface)) {
1112         return UCS_ARBITER_CB_RESULT_STOP;
1113     }
1114     uct_dc_mlx5_iface_dci_alloc(iface, ep);
1115     ucs_assert_always(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
1116     uct_dc_mlx5_iface_dci_sched_tx(iface, ep);
1117     return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
1118 }
1119 
1120 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_common_pending_tx(uct_dc_mlx5_ep_t * ep,ucs_arbiter_elem_t * elem)1121 uct_dc_mlx5_iface_dci_do_common_pending_tx(uct_dc_mlx5_ep_t *ep,
1122                                            ucs_arbiter_elem_t *elem)
1123 {
1124     uct_pending_req_t *req     = ucs_container_of(elem, uct_pending_req_t, priv);
1125     uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1126                                                 uct_dc_mlx5_iface_t);
1127     ucs_status_t status;
1128 
1129     if (!uct_dc_mlx5_iface_has_tx_resources(iface)) {
1130         return UCS_ARBITER_CB_RESULT_STOP;
1131     }
1132 
1133     ucs_trace_data("progressing pending request %p", req);
1134     status = req->func(req);
1135     ucs_trace_data("status returned from progress pending: %s",
1136                    ucs_status_string(status));
1137 
1138     if (status == UCS_OK) {
1139         return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
1140     } else if (status == UCS_INPROGRESS) {
1141         return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
1142     }
1143 
1144     if (!uct_dc_mlx5_iface_dci_ep_can_send(ep)) {
1145         return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
1146     }
1147 
1148     ucs_assertv(!uct_dc_mlx5_iface_has_tx_resources(iface),
1149                 "pending callback returned error but send resources are available");
1150     return UCS_ARBITER_CB_RESULT_STOP;
1151 }
1152 
1153 /**
1154  * dispatch requests waiting for tx resources (dcs* DCI policies)
1155  */
1156 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1157 uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t *arbiter,
1158                                         ucs_arbiter_group_t *group,
1159                                         ucs_arbiter_elem_t *elem,
1160                                         void *arg)
1161 {
1162 
1163     uct_dc_mlx5_ep_t *ep       = ucs_container_of(group, uct_dc_mlx5_ep_t,
1164                                                   arb_group);
1165     uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1166                                                 uct_dc_mlx5_iface_t);
1167     int is_only                = ucs_arbiter_elem_is_only(elem);
1168     ucs_arbiter_cb_result_t res;
1169 
1170     res     = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
1171     if (res == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
1172         /* For dcs* policies release dci if this is the last elem in the group
1173          * and the dci has no outstanding operations. For example pending
1174          * callback did not send anything. (uct_ep_flush or just return ok)
1175          */
1176         if (is_only) {
1177             uct_dc_mlx5_iface_dci_free(iface, ep);
1178         }
1179     }
1180 
1181     return res;
1182 }
1183 
1184 /**
1185  * dispatch requests waiting for tx resources (rand DCI policy)
1186  */
1187 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_rand_pending_tx(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1188 uct_dc_mlx5_iface_dci_do_rand_pending_tx(ucs_arbiter_t *arbiter,
1189                                          ucs_arbiter_group_t *group,
1190                                          ucs_arbiter_elem_t *elem,
1191                                          void *arg)
1192 {
1193     uct_pending_req_t *req     = ucs_container_of(elem, uct_pending_req_t, priv);
1194     uct_dc_mlx5_ep_t *ep       = uct_dc_mlx5_pending_req_priv(req)->ep;
1195     uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1196                                                 uct_dc_mlx5_iface_t);
1197     ucs_arbiter_cb_result_t res;
1198 
1199     res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
1200     if ((res == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) &&
1201         uct_rc_fc_has_resources(&iface->super.super, &ep->fc)) {
1202         /* We can't desched group with rand policy if non FC resources are
1203          * missing, since it's never scheduled again. */
1204         res = UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
1205     }
1206 
1207     return res;
1208 }
1209 
1210 static ucs_arbiter_cb_result_t
uct_dc_mlx5_ep_abriter_purge_cb(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1211 uct_dc_mlx5_ep_abriter_purge_cb(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group,
1212                                 ucs_arbiter_elem_t *elem, void *arg)
1213 {
1214     uct_purge_cb_args_t *cb_args = arg;
1215     void **priv_args             = cb_args->arg;
1216     uct_dc_mlx5_ep_t *ep         = priv_args[0];
1217     uct_dc_mlx5_iface_t *iface   = ucs_derived_of(ep->super.super.iface,
1218                                                   uct_dc_mlx5_iface_t);
1219     uct_pending_req_t *req       = ucs_container_of(elem, uct_pending_req_t, priv);
1220     uct_rc_fc_request_t *freq;
1221 
1222     if (uct_dc_mlx5_iface_is_dci_rand(iface) &&
1223         (uct_dc_mlx5_pending_req_priv(req)->ep != ep)) {
1224         /* element belongs to another ep - do not remove it */
1225         return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
1226     }
1227 
1228     if (ucs_likely(req->func != uct_dc_mlx5_iface_fc_grant)){
1229         if (cb_args->cb != NULL) {
1230             cb_args->cb(req, priv_args[1]);
1231         } else {
1232             ucs_debug("ep=%p cancelling user pending request %p", ep, req);
1233         }
1234     } else {
1235         /* User callback should not be called for FC messages.
1236          * Just return pending request memory to the pool */
1237         freq = ucs_derived_of(req, uct_rc_fc_request_t);
1238         ucs_mpool_put(freq);
1239     }
1240 
1241     return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
1242 }
1243 
uct_dc_mlx5_ep_pending_purge(uct_ep_h tl_ep,uct_pending_purge_callback_t cb,void * arg)1244 void uct_dc_mlx5_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb, void *arg)
1245 {
1246     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
1247     uct_dc_mlx5_ep_t *ep       = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
1248     void *priv_args[2]         = {ep, arg};
1249     uct_purge_cb_args_t args   = {cb, priv_args};
1250 
1251     if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
1252         ucs_arbiter_group_purge(uct_dc_mlx5_iface_tx_waitq(iface),
1253                                 uct_dc_mlx5_ep_rand_arb_group(iface, ep),
1254                                 uct_dc_mlx5_ep_abriter_purge_cb, &args);
1255         return;
1256     }
1257 
1258     if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
1259         ucs_arbiter_group_purge(uct_dc_mlx5_iface_dci_waitq(iface), &ep->arb_group,
1260                                 uct_dc_mlx5_ep_abriter_purge_cb, &args);
1261     } else {
1262         ucs_arbiter_group_purge(uct_dc_mlx5_iface_tx_waitq(iface), &ep->arb_group,
1263                                 uct_dc_mlx5_ep_abriter_purge_cb, &args);
1264         uct_dc_mlx5_iface_dci_free(iface, ep);
1265     }
1266 }
1267 
uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep)1268 ucs_status_t uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
1269 {
1270     ucs_status_t status;
1271 
1272     if (iface->super.super.config.fc_enabled) {
1273         UCT_RC_CHECK_FC_WND(&ep->fc, ep->super.stats);
1274         if ((ep->fc.fc_wnd == iface->super.super.config.fc_hard_thresh) &&
1275             !uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
1276             status = uct_rc_fc_ctrl(&ep->super.super,
1277                                     UCT_RC_EP_FC_FLAG_HARD_REQ,
1278                                     NULL);
1279             if (status != UCS_OK) {
1280                 return status;
1281             }
1282             ep->fc.flags |= UCT_DC_MLX5_EP_FC_FLAG_WAIT_FOR_GRANT;
1283             ++iface->tx.fc_grants;
1284         }
1285     } else {
1286         /* Set fc_wnd to max, to send as much as possible without checks */
1287         ep->fc.fc_wnd = INT16_MAX;
1288     }
1289     return UCS_OK;
1290 }
1291 
uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t * ep,void * arg,ucs_status_t ep_status)1292 void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg,
1293                                    ucs_status_t ep_status)
1294 {
1295     uct_iface_h tl_iface       = ep->super.super.iface;
1296     uint8_t dci                = ep->dci;
1297     uct_ib_iface_t *ib_iface   = ucs_derived_of(tl_iface, uct_ib_iface_t);
1298     uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
1299     uct_rc_txqp_t *txqp        = &iface->tx.dcis[dci].txqp;
1300     uct_ib_mlx5_txwq_t *txwq   = &iface->tx.dcis[dci].txwq;
1301     int16_t outstanding;
1302     ucs_status_t status;
1303 
1304     ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
1305 
1306     uct_rc_txqp_purge_outstanding(txqp, ep_status, 0);
1307 
1308     /* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble
1309        is not updated for the error cqe and all outstanding wqes*/
1310     outstanding = (int16_t)iface->super.super.config.tx_qp_len -
1311                   uct_rc_txqp_available(txqp);
1312     iface->super.super.tx.cq_available += outstanding;
1313     uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len);
1314 
1315     /* since we removed all outstanding ops on the dci, it should be released */
1316     ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
1317     uct_dc_mlx5_iface_dci_put(iface, dci);
1318     ucs_assert_always(ep->dci == UCT_DC_MLX5_EP_NO_DCI);
1319 
1320     if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
1321         /* No need to wait for grant on this ep anymore */
1322         uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
1323     }
1324 
1325     if (ep == iface->tx.fc_ep) {
1326         ucs_assert(ep_status != UCS_ERR_CANCELED);
1327         /* Cannot handle errors on flow-control endpoint.
1328          * Or shall we ignore them?
1329          */
1330         ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface,
1331                   ucs_status_string(ep_status));
1332     } else {
1333         status = ib_iface->ops->set_ep_failed(ib_iface, &ep->super.super,
1334                                               ep_status);
1335         if (status != UCS_OK) {
1336             uct_ib_mlx5_completion_with_err(ib_iface, arg,
1337                                             &iface->tx.dcis[dci].txwq,
1338                                             UCS_LOG_LEVEL_FATAL);
1339             return;
1340         }
1341     }
1342 
1343     if (ep_status != UCS_ERR_CANCELED) {
1344         uct_ib_mlx5_completion_with_err(ib_iface, arg, &iface->tx.dcis[dci].txwq,
1345                                         ib_iface->super.config.failure_level);
1346     }
1347 
1348     status = uct_dc_mlx5_iface_reset_dci(iface, &iface->tx.dcis[dci]);
1349     if (status != UCS_OK) {
1350         ucs_fatal("iface %p failed to reset dci[%d] qpn 0x%x: %s",
1351                   iface, dci, txwq->super.qp_num, ucs_status_string(status));
1352     }
1353 
1354     status = uct_dc_mlx5_iface_dci_connect(iface, &iface->tx.dcis[dci]);
1355     if (status != UCS_OK) {
1356         ucs_fatal("iface %p failed to connect dci[%d] qpn 0x%x: %s",
1357                   iface, dci, txwq->super.qp_num, ucs_status_string(status));
1358     }
1359 }
1360