1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2020. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #ifdef HAVE_CONFIG_H
8 # include "config.h"
9 #endif
10
11 #include "dc_mlx5_ep.h"
12 #include "dc_mlx5.h"
13
14 #include <uct/ib/rc/accel/rc_mlx5.inl>
15 #include <uct/ib/mlx5/ib_mlx5_log.h>
16
17 #define UCT_DC_MLX5_IFACE_TXQP_GET(_iface, _ep, _txqp, _txwq) \
18 { \
19 uint8_t dci; \
20 dci = (_ep)->dci; \
21 _txqp = &(_iface)->tx.dcis[dci].txqp; \
22 _txwq = &(_iface)->tx.dcis[dci].txwq; \
23 }
24
25 static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_bcopy_post(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,unsigned opcode,unsigned length,uint64_t rdma_raddr,uct_rkey_t rdma_rkey,uct_rc_iface_send_desc_t * desc,uint8_t send_flags,uint32_t imm_val_be,const void * buffer,uct_ib_log_sge_t * log_sge)26 uct_dc_mlx5_iface_bcopy_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
27 unsigned opcode, unsigned length,
28 /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
29 uct_rc_iface_send_desc_t *desc, uint8_t send_flags,
30 uint32_t imm_val_be, const void *buffer,
31 uct_ib_log_sge_t *log_sge)
32 {
33 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
34
35 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
36 desc->super.sn = txwq->sw_pi;
37 uct_rc_mlx5_txqp_dptr_post(&iface->super, UCT_IB_QPT_DCI, txqp, txwq,
38 opcode, buffer, length, &desc->lkey,
39 rdma_raddr, rdma_rkey, 0, 0, 0, 0,
40 &ep->av, uct_dc_mlx5_ep_get_grh(ep),
41 uct_ib_mlx5_wqe_av_size(&ep->av),
42 MLX5_WQE_CTRL_CQ_UPDATE | send_flags, imm_val_be, INT_MAX,
43 log_sge);
44 uct_rc_txqp_add_send_op(txqp, &desc->super);
45 }
46
47
48 static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_zcopy_post(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,unsigned opcode,const uct_iov_t * iov,size_t iovcnt,size_t iov_total_length,uint8_t am_id,const void * am_hdr,unsigned am_hdr_len,uint64_t rdma_raddr,uct_rkey_t rdma_rkey,uct_tag_t tag,uint32_t app_ctx,uint32_t ib_imm_be,uct_rc_send_handler_t handler,uct_completion_t * comp,uint8_t send_flags)49 uct_dc_mlx5_iface_zcopy_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
50 unsigned opcode, const uct_iov_t *iov, size_t iovcnt,
51 size_t iov_total_length,
52 /* SEND */ uint8_t am_id, const void *am_hdr, unsigned am_hdr_len,
53 /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
54 /* TAG */ uct_tag_t tag, uint32_t app_ctx, uint32_t ib_imm_be,
55 uct_rc_send_handler_t handler, uct_completion_t *comp,
56 uint8_t send_flags)
57 {
58 uint16_t sn;
59 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
60
61 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
62
63 sn = txwq->sw_pi;
64 uct_rc_mlx5_txqp_dptr_post_iov(&iface->super, UCT_IB_QPT_DCI, txqp,
65 txwq, opcode, iov, iovcnt,
66 am_id, am_hdr, am_hdr_len,
67 rdma_raddr, rdma_rkey,
68 tag, app_ctx, ib_imm_be,
69 &ep->av, uct_dc_mlx5_ep_get_grh(ep),
70 uct_ib_mlx5_wqe_av_size(&ep->av),
71 MLX5_WQE_CTRL_CQ_UPDATE | send_flags,
72 UCT_IB_MAX_ZCOPY_LOG_SGE(&iface->super.super.super));
73
74 uct_rc_txqp_add_send_comp(&iface->super.super, txqp, handler, comp, sn,
75 UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY, iov_total_length);
76 }
77
78 static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,unsigned opcode,uct_rc_iface_send_desc_t * desc,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uint64_t compare_mask,uint64_t compare,uint64_t swap_mask,uint64_t swap_add)79 uct_dc_mlx5_iface_atomic_post(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
80 unsigned opcode, uct_rc_iface_send_desc_t *desc, unsigned length,
81 uint64_t remote_addr, uct_rkey_t rkey,
82 uint64_t compare_mask, uint64_t compare,
83 uint64_t swap_mask, uint64_t swap_add)
84 {
85 uint32_t ib_rkey = uct_ib_resolve_atomic_rkey(rkey, ep->atomic_mr_offset,
86 &remote_addr);
87
88 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
89 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
90
91 desc->super.sn = txwq->sw_pi;
92 uct_rc_mlx5_txqp_dptr_post(&iface->super, UCT_IB_QPT_DCI, txqp, txwq,
93 opcode, desc + 1, length, &desc->lkey,
94 remote_addr, ib_rkey,
95 compare_mask, compare, swap_mask, swap_add,
96 &ep->av, uct_dc_mlx5_ep_get_grh(ep),
97 uct_ib_mlx5_wqe_av_size(&ep->av),
98 MLX5_WQE_CTRL_CQ_UPDATE, 0, INT_MAX, NULL);
99
100 UCT_TL_EP_STAT_ATOMIC(&ep->super);
101 uct_rc_txqp_add_send_op(txqp, &desc->super);
102 }
103
104 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep,unsigned opcode,unsigned size,uint64_t value,uint64_t remote_addr,uct_rkey_t rkey)105 uct_dc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
106 uint64_t value, uint64_t remote_addr, uct_rkey_t rkey)
107 {
108 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
109 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
110 uct_rc_iface_send_desc_t *desc;
111 int op;
112 uint64_t compare_mask;
113 uint64_t compare;
114 uint64_t swap_mask;
115 uint64_t swap;
116 int ext; /* not used here */
117 ucs_status_t status;
118
119 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
120 UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_OPS);
121
122 status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
123 &compare, &swap_mask, &swap, &ext);
124 if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
125 return status;
126 }
127
128 UCT_RC_IFACE_GET_TX_ATOMIC_DESC(&iface->super.super, &iface->super.tx.atomic_desc_mp, desc);
129 uct_dc_mlx5_iface_atomic_post(iface, ep, op, desc, size, remote_addr, rkey,
130 compare_mask, compare, swap_mask, swap);
131 return UCS_OK;
132 }
133
134 static UCS_F_ALWAYS_INLINE ucs_status_t
uct_dc_mlx5_ep_atomic_fop(uct_dc_mlx5_ep_t * ep,int opcode,void * result,int ext,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uint64_t compare_mask,uint64_t compare,uint64_t swap_mask,uint64_t swap_add,uct_completion_t * comp)135 uct_dc_mlx5_ep_atomic_fop(uct_dc_mlx5_ep_t *ep, int opcode, void *result, int ext,
136 unsigned length, uint64_t remote_addr, uct_rkey_t rkey,
137 uint64_t compare_mask, uint64_t compare,
138 uint64_t swap_mask, uint64_t swap_add, uct_completion_t *comp)
139 {
140 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
141 uct_rc_iface_send_desc_t *desc;
142
143 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
144 UCT_RC_IFACE_GET_TX_ATOMIC_FETCH_DESC(&iface->super.super, &iface->super.tx.atomic_desc_mp,
145 desc, uct_rc_iface_atomic_handler(&iface->super.super,
146 ext, length),
147 result, comp);
148 uct_dc_mlx5_iface_atomic_post(iface, ep, opcode, desc, length, remote_addr, rkey,
149 compare_mask, compare, swap_mask, swap_add);
150 return UCS_INPROGRESS;
151 }
152
153 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep,unsigned opcode,unsigned size,uint64_t value,void * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)154 uct_dc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
155 uint64_t value, void *result,
156 uint64_t remote_addr, uct_rkey_t rkey,
157 uct_completion_t *comp)
158 {
159 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
160 int op;
161 uint64_t compare_mask;
162 uint64_t compare;
163 uint64_t swap_mask;
164 uint64_t swap;
165 int ext;
166 ucs_status_t status;
167
168 UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_FOPS);
169
170 status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
171 &compare, &swap_mask, &swap, &ext);
172 if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
173 return status;
174 }
175
176 return uct_dc_mlx5_ep_atomic_fop(ep, op, result, ext, size, remote_addr, rkey,
177 compare_mask, compare, swap_mask, swap, comp);
178 }
179
uct_dc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep,uint64_t compare,uint64_t swap,uint64_t remote_addr,uct_rkey_t rkey,uint64_t * result,uct_completion_t * comp)180 ucs_status_t uct_dc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep, uint64_t compare, uint64_t swap,
181 uint64_t remote_addr, uct_rkey_t rkey,
182 uint64_t *result, uct_completion_t *comp)
183 {
184 return uct_dc_mlx5_ep_atomic_fop(ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t),
185 MLX5_OPCODE_ATOMIC_CS, result, 0, sizeof(uint64_t),
186 remote_addr, rkey, 0, htobe64(compare), UINT64_MAX,
187 htobe64(swap), comp);
188 }
189
uct_dc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep,uint32_t compare,uint32_t swap,uint64_t remote_addr,uct_rkey_t rkey,uint32_t * result,uct_completion_t * comp)190 ucs_status_t uct_dc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep, uint32_t compare, uint32_t swap,
191 uint64_t remote_addr, uct_rkey_t rkey,
192 uint32_t *result, uct_completion_t *comp)
193 {
194 return uct_dc_mlx5_ep_atomic_fop(ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t),
195 MLX5_OPCODE_ATOMIC_MASKED_CS, result, 1,
196 sizeof(uint32_t), remote_addr, rkey, UCS_MASK(32),
197 htonl(compare), UINT64_MAX, htonl(swap), comp);
198 }
199
uct_dc_mlx5_ep_atomic32_post(uct_ep_h ep,unsigned opcode,uint32_t value,uint64_t remote_addr,uct_rkey_t rkey)200 ucs_status_t uct_dc_mlx5_ep_atomic32_post(uct_ep_h ep, unsigned opcode, uint32_t value,
201 uint64_t remote_addr, uct_rkey_t rkey)
202 {
203 return uct_dc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
204 }
205
uct_dc_mlx5_ep_atomic64_post(uct_ep_h ep,unsigned opcode,uint64_t value,uint64_t remote_addr,uct_rkey_t rkey)206 ucs_status_t uct_dc_mlx5_ep_atomic64_post(uct_ep_h ep, unsigned opcode, uint64_t value,
207 uint64_t remote_addr, uct_rkey_t rkey)
208 {
209 return uct_dc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
210 }
211
uct_dc_mlx5_ep_atomic64_fetch(uct_ep_h ep,uct_atomic_op_t opcode,uint64_t value,uint64_t * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)212 ucs_status_t uct_dc_mlx5_ep_atomic64_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
213 uint64_t value, uint64_t *result,
214 uint64_t remote_addr, uct_rkey_t rkey,
215 uct_completion_t *comp)
216 {
217 return uct_dc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
218 remote_addr, rkey, comp);
219 }
220
uct_dc_mlx5_ep_atomic32_fetch(uct_ep_h ep,uct_atomic_op_t opcode,uint32_t value,uint32_t * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)221 ucs_status_t uct_dc_mlx5_ep_atomic32_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
222 uint32_t value, uint32_t *result,
223 uint64_t remote_addr, uct_rkey_t rkey,
224 uct_completion_t *comp)
225 {
226 return uct_dc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
227 remote_addr, rkey, comp);
228 }
229
uct_dc_mlx5_ep_fence(uct_ep_h tl_ep,unsigned flags)230 ucs_status_t uct_dc_mlx5_ep_fence(uct_ep_h tl_ep, unsigned flags)
231 {
232 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
233 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
234
235 return uct_rc_ep_fence(tl_ep, &iface->tx.dcis[ep->dci].txwq.fi,
236 ep->dci != UCT_DC_MLX5_EP_NO_DCI);
237 }
238
239 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_am_short_inline(uct_ep_h tl_ep,uint8_t id,uint64_t hdr,const void * buffer,unsigned length)240 uct_dc_mlx5_ep_am_short_inline(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
241 const void *buffer, unsigned length)
242 {
243 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
244 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
245 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
246
247 UCT_RC_MLX5_CHECK_AM_SHORT(id, length, UCT_IB_MLX5_AV_FULL_SIZE);
248 UCT_DC_CHECK_RES_AND_FC(iface, ep);
249
250 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
251
252 uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
253 txqp, txwq,
254 MLX5_OPCODE_SEND,
255 buffer, length, id, hdr, 0,
256 0, 0,
257 &ep->av, uct_dc_mlx5_ep_get_grh(ep),
258 uct_ib_mlx5_wqe_av_size(&ep->av),
259 MLX5_WQE_CTRL_SOLICITED, INT_MAX);
260
261 UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
262 UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, sizeof(hdr) + length);
263 return UCS_OK;
264 }
265
266 #if HAVE_IBV_DM
267 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_short_dm(uct_dc_mlx5_ep_t * ep,uct_rc_mlx5_dm_copy_data_t * cache,size_t hdr_len,const void * payload,unsigned length,unsigned opcode,uint8_t fm_ce_se,uint64_t rdma_raddr,uct_rkey_t rdma_rkey)268 uct_dc_mlx5_ep_short_dm(uct_dc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache,
269 size_t hdr_len, const void *payload, unsigned length,
270 unsigned opcode, uint8_t fm_ce_se,
271 uint64_t rdma_raddr, uct_rkey_t rdma_rkey)
272 {
273 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
274 uct_rc_iface_send_desc_t *desc = NULL;
275 void *buffer;
276 ucs_status_t status;
277 uct_ib_log_sge_t log_sge;
278
279 status = uct_rc_mlx5_common_dm_make_data(&iface->super, cache, hdr_len,
280 payload, length, &desc,
281 &buffer, &log_sge);
282 if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
283 return status;
284 }
285
286 uct_dc_mlx5_iface_bcopy_post(iface, ep, opcode,
287 hdr_len + length,
288 rdma_raddr, rdma_rkey,
289 desc, fm_ce_se, 0, buffer,
290 log_sge.num_sge ? &log_sge : NULL);
291 return UCS_OK;
292 }
293 #endif
294
uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep,uint8_t id,uint64_t hdr,const void * buffer,unsigned length)295 ucs_status_t uct_dc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
296 const void *buffer, unsigned length)
297 {
298 #if HAVE_IBV_DM
299 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
300 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
301 ucs_status_t status;
302 uct_rc_mlx5_dm_copy_data_t cache;
303
304 if (ucs_likely((sizeof(uct_rc_mlx5_am_short_hdr_t) + length <=
305 UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) ||
306 !iface->super.dm.dm)) {
307 #endif
308 return uct_dc_mlx5_ep_am_short_inline(tl_ep, id, hdr, buffer, length);
309 #if HAVE_IBV_DM
310 }
311
312 UCT_CHECK_AM_ID(id);
313 UCT_CHECK_LENGTH(length + sizeof(uct_rc_mlx5_am_short_hdr_t), 0,
314 iface->super.dm.seg_len, "am_short");
315 UCT_DC_CHECK_RES_AND_FC(iface, ep);
316
317 uct_rc_mlx5_am_hdr_fill(&cache.am_hdr.rc_hdr, id);
318 cache.am_hdr.am_hdr = hdr;
319
320 status = uct_dc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.am_hdr), buffer, length,
321 MLX5_OPCODE_SEND,
322 MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
323 0, 0);
324 if (UCS_STATUS_IS_ERR(status)) {
325 return status;
326 }
327 UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, sizeof(cache.am_hdr) + length);
328 UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
329 return UCS_OK;
330 #endif
331 }
332
uct_dc_mlx5_ep_am_bcopy(uct_ep_h tl_ep,uint8_t id,uct_pack_callback_t pack_cb,void * arg,unsigned flags)333 ssize_t uct_dc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id,
334 uct_pack_callback_t pack_cb, void *arg,
335 unsigned flags)
336 {
337 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
338 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
339 uct_rc_iface_send_desc_t *desc;
340 size_t length;
341
342 UCT_CHECK_AM_ID(id);
343 UCT_DC_CHECK_RES_AND_FC(iface, ep);
344 UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(&iface->super.super, &iface->super.super.tx.mp, desc,
345 id, uct_rc_mlx5_am_hdr_fill, uct_rc_mlx5_hdr_t,
346 pack_cb, arg, &length);
347
348 uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_SEND,
349 sizeof(uct_rc_mlx5_hdr_t) + length, 0, 0, desc,
350 MLX5_WQE_CTRL_SOLICITED, 0, desc + 1, NULL);
351
352 UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
353 UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, length);
354 return length;
355 }
356
uct_dc_mlx5_ep_am_zcopy(uct_ep_h tl_ep,uint8_t id,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)357 ucs_status_t uct_dc_mlx5_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *header,
358 unsigned header_length, const uct_iov_t *iov,
359 size_t iovcnt, unsigned flags,
360 uct_completion_t *comp)
361 {
362 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
363 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
364
365 UCT_CHECK_IOV_SIZE(iovcnt, UCT_IB_MLX5_AM_ZCOPY_MAX_IOV,
366 "uct_dc_mlx5_ep_am_zcopy");
367 UCT_RC_MLX5_CHECK_AM_ZCOPY(id, header_length, uct_iov_total_length(iov, iovcnt),
368 iface->super.super.super.config.seg_size,
369 UCT_IB_MLX5_AV_FULL_SIZE);
370 UCT_DC_CHECK_RES_AND_FC(iface, ep);
371
372 uct_dc_mlx5_iface_zcopy_post(iface, ep, MLX5_OPCODE_SEND, iov, iovcnt, 0ul,
373 id, header, header_length, 0, 0, 0ul, 0, 0,
374 uct_rc_ep_send_op_completion_handler,
375 comp, MLX5_WQE_CTRL_SOLICITED);
376
377 UCT_RC_UPDATE_FC_WND(&iface->super.super, &ep->fc);
378 UCT_TL_EP_STAT_OP(&ep->super, AM, ZCOPY, header_length +
379 uct_iov_total_length(iov, iovcnt));
380
381 return UCS_INPROGRESS;
382 }
383
384 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_put_short_inline(uct_ep_h tl_ep,const void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey)385 uct_dc_mlx5_ep_put_short_inline(uct_ep_h tl_ep, const void *buffer,
386 unsigned length, uint64_t remote_addr,
387 uct_rkey_t rkey)
388 {
389 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
390 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
391 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
392
393 UCT_RC_MLX5_CHECK_PUT_SHORT(length, UCT_IB_MLX5_AV_FULL_SIZE);
394 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
395
396 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
397 uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
398 ep->atomic_mr_offset);
399 uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
400 txqp, txwq, MLX5_OPCODE_RDMA_WRITE,
401 buffer, length, 0, 0, 0, remote_addr, rkey,
402 &ep->av, uct_dc_mlx5_ep_get_grh(ep),
403 uct_ib_mlx5_wqe_av_size(&ep->av), 0, INT_MAX);
404
405 UCT_TL_EP_STAT_OP(&ep->super, PUT, SHORT, length);
406
407 return UCS_OK;
408 }
409
uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep,const void * payload,unsigned length,uint64_t remote_addr,uct_rkey_t rkey)410 ucs_status_t uct_dc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *payload,
411 unsigned length, uint64_t remote_addr,
412 uct_rkey_t rkey)
413 {
414 #if HAVE_IBV_DM
415 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
416 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
417 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
418 ucs_status_t status;
419
420 if (ucs_likely((length <= UCT_IB_MLX5_PUT_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) ||
421 !iface->super.dm.dm)) {
422 #endif
423 return uct_dc_mlx5_ep_put_short_inline(tl_ep, payload, length, remote_addr, rkey);
424 #if HAVE_IBV_DM
425 }
426
427 UCT_CHECK_LENGTH(length, 0, iface->super.dm.seg_len, "put_short");
428 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
429 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
430 uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
431 ep->atomic_mr_offset);
432 status = uct_dc_mlx5_ep_short_dm(ep, NULL, 0, payload, length,
433 MLX5_OPCODE_RDMA_WRITE,
434 MLX5_WQE_CTRL_CQ_UPDATE,
435 remote_addr, rkey);
436 if (UCS_STATUS_IS_ERR(status)) {
437 return status;
438 }
439 UCT_TL_EP_STAT_OP(&ep->super, PUT, SHORT, length);
440 return UCS_OK;
441 #endif
442 }
443
uct_dc_mlx5_ep_put_bcopy(uct_ep_h tl_ep,uct_pack_callback_t pack_cb,void * arg,uint64_t remote_addr,uct_rkey_t rkey)444 ssize_t uct_dc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb,
445 void *arg, uint64_t remote_addr, uct_rkey_t rkey)
446 {
447 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
448 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
449 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
450 uct_rc_iface_send_desc_t *desc;
451 size_t length;
452
453 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
454 UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(&iface->super.super, &iface->super.super.tx.mp,
455 desc, pack_cb, arg, length);
456 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
457 uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
458 ep->atomic_mr_offset);
459 uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_RDMA_WRITE, length,
460 remote_addr, rkey, desc, 0, 0, desc + 1, NULL);
461 UCT_TL_EP_STAT_OP(&ep->super, PUT, BCOPY, length);
462 return length;
463 }
464
uct_dc_mlx5_ep_put_zcopy(uct_ep_h tl_ep,const uct_iov_t * iov,size_t iovcnt,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)465 ucs_status_t uct_dc_mlx5_ep_put_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov, size_t iovcnt,
466 uint64_t remote_addr, uct_rkey_t rkey,
467 uct_completion_t *comp)
468 {
469 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
470 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
471 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
472
473 UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_RMA_MAX_IOV(UCT_IB_MLX5_AV_FULL_SIZE),
474 "uct_dc_mlx5_ep_put_zcopy");
475 UCT_CHECK_LENGTH(uct_iov_total_length(iov, iovcnt), 0, UCT_IB_MAX_MESSAGE_SIZE,
476 "put_zcopy");
477 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
478 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
479 uct_rc_mlx5_ep_fence_put(&iface->super, txwq, &rkey, &remote_addr,
480 ep->atomic_mr_offset);
481
482 uct_dc_mlx5_iface_zcopy_post(iface, ep, MLX5_OPCODE_RDMA_WRITE, iov, iovcnt,
483 0ul, 0, NULL, 0, remote_addr, rkey, 0ul, 0, 0,
484 uct_rc_ep_send_op_completion_handler, comp, 0);
485
486 UCT_TL_EP_STAT_OP(&ep->super, PUT, ZCOPY,
487 uct_iov_total_length(iov, iovcnt));
488 return UCS_INPROGRESS;
489 }
490
uct_dc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,uct_unpack_callback_t unpack_cb,void * arg,size_t length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)491 ucs_status_t uct_dc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,
492 uct_unpack_callback_t unpack_cb,
493 void *arg, size_t length,
494 uint64_t remote_addr, uct_rkey_t rkey,
495 uct_completion_t *comp)
496 {
497 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
498 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
499 uint8_t fm_ce_se = 0;
500 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
501 uct_rc_iface_send_desc_t *desc;
502
503 UCT_CHECK_LENGTH(length, 0, iface->super.super.super.config.seg_size,
504 "get_bcopy");
505 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
506 UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(&iface->super.super,
507 &iface->super.super.tx.mp,
508 desc, unpack_cb, comp, arg, length);
509 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
510 uct_rc_mlx5_ep_fence_get(&iface->super, txwq, &rkey, &fm_ce_se);
511
512 uct_dc_mlx5_iface_bcopy_post(iface, ep, MLX5_OPCODE_RDMA_READ, length,
513 remote_addr, rkey, desc, fm_ce_se, 0,
514 desc + 1, NULL);
515
516 UCT_RC_RDMA_READ_POSTED(&iface->super.super, length);
517 UCT_TL_EP_STAT_OP(&ep->super, GET, BCOPY, length);
518
519 return UCS_INPROGRESS;
520 }
521
522
uct_dc_mlx5_ep_get_zcopy(uct_ep_h tl_ep,const uct_iov_t * iov,size_t iovcnt,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)523 ucs_status_t uct_dc_mlx5_ep_get_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov,
524 size_t iovcnt, uint64_t remote_addr,
525 uct_rkey_t rkey, uct_completion_t *comp)
526 {
527 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
528 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
529 uint8_t fm_ce_se = 0;
530 size_t total_length = uct_iov_total_length(iov, iovcnt);
531 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
532
533 UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_RMA_MAX_IOV(UCT_IB_MLX5_AV_FULL_SIZE),
534 "uct_dc_mlx5_ep_get_zcopy");
535 UCT_CHECK_LENGTH(total_length,
536 iface->super.super.super.config.max_inl_cqe[UCT_IB_DIR_TX] + 1,
537 iface->super.super.config.max_get_zcopy, "get_zcopy");
538 UCT_DC_MLX5_CHECK_RMA_RES(iface, ep);
539 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
540 uct_rc_mlx5_ep_fence_get(&iface->super, txwq, &rkey, &fm_ce_se);
541
542 uct_dc_mlx5_iface_zcopy_post(iface, ep, MLX5_OPCODE_RDMA_READ, iov, iovcnt,
543 total_length, 0, NULL, 0, remote_addr, rkey,
544 0ul, 0, 0,
545 uct_rc_ep_get_zcopy_completion_handler, comp,
546 fm_ce_se);
547
548 UCT_RC_RDMA_READ_POSTED(&iface->super.super, total_length);
549 UCT_TL_EP_STAT_OP(&ep->super, GET, ZCOPY, total_length);
550
551 return UCS_INPROGRESS;
552 }
553
uct_dc_mlx5_ep_flush(uct_ep_h tl_ep,unsigned flags,uct_completion_t * comp)554 ucs_status_t uct_dc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
555 uct_completion_t *comp)
556 {
557 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
558 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
559 ucs_status_t status;
560 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
561
562 if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
563 if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
564 return UCS_ERR_UNSUPPORTED;
565 }
566
567 uct_ep_pending_purge(tl_ep, NULL, 0);
568 if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
569 /* No dci -> no WQEs -> HW is clean, nothing to cancel */
570 return UCS_OK;
571 }
572
573 uct_dc_mlx5_ep_handle_failure(ep, NULL, UCS_ERR_CANCELED);
574 return UCS_OK;
575 }
576
577 if (!uct_dc_mlx5_iface_has_tx_resources(iface)) {
578 return UCS_ERR_NO_RESOURCE;
579 }
580
581 if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
582 if (!uct_dc_mlx5_iface_dci_can_alloc(iface)) {
583 return UCS_ERR_NO_RESOURCE; /* waiting for dci */
584 } else {
585 UCT_TL_EP_STAT_FLUSH(&ep->super); /* no sends */
586 return UCS_OK;
587 }
588 }
589
590 if (!uct_dc_mlx5_iface_dci_ep_can_send(ep)) {
591 return UCS_ERR_NO_RESOURCE; /* cannot send */
592 }
593
594 status = uct_dc_mlx5_iface_flush_dci(iface, ep->dci);
595 if (status == UCS_OK) {
596 UCT_TL_EP_STAT_FLUSH(&ep->super);
597 return UCS_OK; /* all sends completed */
598 }
599
600 ucs_assert(status == UCS_INPROGRESS);
601 ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
602
603 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
604
605 return uct_rc_txqp_add_flush_comp(&iface->super.super, &ep->super, txqp,
606 comp, txwq->sig_pi);
607 }
608
609 #if IBV_HW_TM
610 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_dc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep,uct_tag_t tag,const void * data,size_t length)611 uct_dc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep, uct_tag_t tag,
612 const void *data, size_t length)
613 {
614 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
615 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
616 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
617
618 UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
619 UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE),
620 "uct_dc_mlx5_ep_tag_short");
621 UCT_DC_MLX5_CHECK_RES(iface, ep);
622
623 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
624
625 uct_rc_mlx5_txqp_tag_inline_post(&iface->super, UCT_IB_QPT_DCI,
626 txqp, txwq, MLX5_OPCODE_SEND, data, length,
627 NULL, tag, 0, IBV_TMH_EAGER, 0,
628 &ep->av, uct_dc_mlx5_ep_get_grh(ep),
629 uct_ib_mlx5_wqe_av_size(&ep->av), NULL, 0,
630 MLX5_WQE_CTRL_SOLICITED);
631
632 UCT_TL_EP_STAT_OP(&ep->super, TAG, SHORT, length);
633
634 return UCS_OK;
635 }
636
uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep,uct_tag_t tag,const void * data,size_t length)637 ucs_status_t uct_dc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag,
638 const void *data, size_t length)
639 {
640 #if HAVE_IBV_DM
641 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
642 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
643 uct_rc_mlx5_dm_copy_data_t cache;
644 ucs_status_t status;
645
646 if (ucs_likely((sizeof(struct ibv_tmh) + length <=
647 UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE)) ||
648 !iface->super.dm.dm)) {
649 #endif
650 return uct_dc_mlx5_ep_tag_eager_short_inline(tl_ep, tag, data, length);
651 #if HAVE_IBV_DM
652 }
653
654 UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
655 iface->super.dm.seg_len, "tag_short");
656 UCT_DC_MLX5_CHECK_RES(iface, ep);
657
658 uct_rc_mlx5_fill_tmh(ucs_unaligned_ptr(&cache.tm_hdr), tag, 0, IBV_TMH_EAGER);
659
660 status = uct_dc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.tm_hdr), data,
661 length, MLX5_OPCODE_SEND,
662 MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
663 0, 0);
664 if (!UCS_STATUS_IS_ERR(status)) {
665 UCT_TL_EP_STAT_OP(&ep->super, TAG, SHORT, length);
666 }
667
668 return status;
669 #endif
670 }
671
uct_dc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep,uct_tag_t tag,uint64_t imm,uct_pack_callback_t pack_cb,void * arg,unsigned flags)672 ssize_t uct_dc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag,
673 uint64_t imm,
674 uct_pack_callback_t pack_cb,
675 void *arg, unsigned flags)
676 {
677 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
678 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
679 uct_rc_iface_send_desc_t *desc;
680 uint32_t app_ctx, ib_imm;
681 int opcode;
682 size_t length;
683
684 UCT_DC_MLX5_CHECK_RES(iface, ep);
685
686 UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND,
687 _IMM);
688
689 UCT_RC_MLX5_IFACE_GET_TM_BCOPY_DESC(&iface->super.super,
690 iface->super.tm.bcopy_mp,
691 desc, tag, app_ctx, pack_cb,
692 arg, length);
693
694 uct_dc_mlx5_iface_bcopy_post(iface, ep, opcode,
695 sizeof(struct ibv_tmh) + length,
696 0, 0, desc, MLX5_WQE_CTRL_SOLICITED, ib_imm,
697 desc + 1, NULL);
698
699 UCT_TL_EP_STAT_OP(&ep->super, TAG, BCOPY, length);
700
701 return length;
702 }
703
uct_dc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep,uct_tag_t tag,uint64_t imm,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)704 ucs_status_t uct_dc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
705 uint64_t imm, const uct_iov_t *iov,
706 size_t iovcnt, unsigned flags,
707 uct_completion_t *comp)
708 {
709 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
710 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
711 uint32_t app_ctx, ib_imm;
712 int opcode;
713
714 UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_TM_EAGER_ZCOPY_MAX_IOV(UCT_IB_MLX5_AV_FULL_SIZE),
715 "uct_dc_mlx5_ep_tag_eager_zcopy");
716 UCT_RC_CHECK_ZCOPY_DATA(sizeof(struct ibv_tmh),
717 uct_iov_total_length(iov, iovcnt),
718 iface->super.tm.max_zcopy);
719
720 UCT_DC_MLX5_CHECK_RES(iface, ep);
721
722 UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND, _IMM);
723
724 uct_dc_mlx5_iface_zcopy_post(iface, ep, opcode|UCT_RC_MLX5_OPCODE_FLAG_TM,
725 iov, iovcnt, 0ul, 0, "", 0, 0, 0, tag, app_ctx,
726 ib_imm, uct_rc_ep_send_op_completion_handler,
727 comp, MLX5_WQE_CTRL_SOLICITED);
728
729 UCT_TL_EP_STAT_OP(&ep->super, TAG, ZCOPY,
730 uct_iov_total_length(iov, iovcnt));
731
732 return UCS_INPROGRESS;
733 }
734
uct_dc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep,uct_tag_t tag,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)735 ucs_status_ptr_t uct_dc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
736 const void *header,
737 unsigned header_length,
738 const uct_iov_t *iov,
739 size_t iovcnt, unsigned flags,
740 uct_completion_t *comp)
741 {
742 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
743 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
744 unsigned tm_hdr_len = sizeof(struct ibv_tmh) +
745 sizeof(struct ibv_rvh) +
746 sizeof(struct ibv_ravh);
747 struct ibv_ravh ravh;
748 uint32_t op_index;
749 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
750
751 UCT_RC_MLX5_CHECK_RNDV_PARAMS(iovcnt, header_length, tm_hdr_len,
752 UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE),
753 iface->super.tm.max_rndv_data +
754 UCT_RC_MLX5_TMH_PRIV_LEN);
755 UCT_DC_CHECK_RES_PTR(iface, ep);
756
757 op_index = uct_rc_mlx5_tag_get_op_id(&iface->super, comp);
758
759 uct_dc_mlx5_iface_fill_ravh(&ravh, iface->rx.dct.qp_num);
760
761 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
762
763 uct_rc_mlx5_txqp_tag_inline_post(&iface->super, UCT_IB_QPT_DCI,
764 txqp, txwq, MLX5_OPCODE_SEND, header,
765 header_length, iov, tag, op_index,
766 IBV_TMH_RNDV, 0, &ep->av,
767 uct_dc_mlx5_ep_get_grh(ep),
768 uct_ib_mlx5_wqe_av_size(&ep->av), &ravh,
769 sizeof(ravh), MLX5_WQE_CTRL_SOLICITED);
770
771 return (ucs_status_ptr_t)((uint64_t)op_index);
772 }
773
uct_dc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep,uct_tag_t tag,const void * header,unsigned header_length,unsigned flags)774 ucs_status_t uct_dc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep, uct_tag_t tag,
775 const void* header,
776 unsigned header_length,
777 unsigned flags)
778 {
779 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
780 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
781 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
782
783 UCT_CHECK_LENGTH(header_length + sizeof(struct ibv_tmh), 0,
784 UCT_IB_MLX5_AM_MAX_SHORT(UCT_IB_MLX5_AV_FULL_SIZE),
785 "tag_rndv_request");
786 UCT_DC_MLX5_CHECK_RES(iface, ep);
787
788 UCT_DC_MLX5_IFACE_TXQP_GET(iface, ep, txqp, txwq);
789
790 uct_rc_mlx5_txqp_tag_inline_post(&iface->super, UCT_IB_QPT_DCI,
791 txqp, txwq, MLX5_OPCODE_SEND_IMM, header,
792 header_length, NULL, tag, 0,
793 IBV_TMH_EAGER, 0, &ep->av,
794 uct_dc_mlx5_ep_get_grh(ep),
795 uct_ib_mlx5_wqe_av_size(&ep->av), NULL, 0,
796 MLX5_WQE_CTRL_SOLICITED);
797 return UCS_OK;
798 }
799
uct_dc_mlx5_iface_tag_recv_zcopy(uct_iface_h tl_iface,uct_tag_t tag,uct_tag_t tag_mask,const uct_iov_t * iov,size_t iovcnt,uct_tag_context_t * ctx)800 ucs_status_t uct_dc_mlx5_iface_tag_recv_zcopy(uct_iface_h tl_iface,
801 uct_tag_t tag,
802 uct_tag_t tag_mask,
803 const uct_iov_t *iov,
804 size_t iovcnt,
805 uct_tag_context_t *ctx)
806 {
807 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
808
809 return uct_rc_mlx5_iface_common_tag_recv(&iface->super, tag, tag_mask,
810 iov, iovcnt, ctx);
811 }
812
uct_dc_mlx5_iface_tag_recv_cancel(uct_iface_h tl_iface,uct_tag_context_t * ctx,int force)813 ucs_status_t uct_dc_mlx5_iface_tag_recv_cancel(uct_iface_h tl_iface,
814 uct_tag_context_t *ctx,
815 int force)
816 {
817 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
818
819 return uct_rc_mlx5_iface_common_tag_recv_cancel(&iface->super, ctx, force);
820 }
821 #endif
822
uct_dc_mlx5_ep_fc_ctrl(uct_ep_t * tl_ep,unsigned op,uct_rc_fc_request_t * req)823 ucs_status_t uct_dc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
824 uct_rc_fc_request_t *req)
825 {
826 uct_dc_mlx5_ep_t *dc_ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
827 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface,
828 uct_dc_mlx5_iface_t);
829 uct_ib_iface_t *ib_iface = &iface->super.super.super;
830 struct ibv_ah_attr ah_attr = {.is_global = 0};
831 uct_dc_fc_sender_data_t sender;
832 uct_dc_fc_request_t *dc_req;
833 struct mlx5_wqe_av mlx5_av;
834 uct_ib_mlx5_base_av_t av;
835 ucs_status_t status;
836 uintptr_t sender_ep;
837 struct ibv_ah *ah;
838
839 UCT_DC_MLX5_TXQP_DECL(txqp, txwq);
840
841 ucs_assert((sizeof(uint8_t) + sizeof(sender_ep)) <=
842 UCT_IB_MLX5_AV_FULL_SIZE);
843
844 UCT_DC_MLX5_CHECK_RES(iface, dc_ep);
845 UCT_DC_MLX5_IFACE_TXQP_GET(iface, dc_ep, txqp, txwq);
846
847 dc_req = ucs_derived_of(req, uct_dc_fc_request_t);
848
849 if (op == UCT_RC_EP_FC_PURE_GRANT) {
850 ucs_assert(req != NULL);
851
852 sender_ep = (uintptr_t)dc_req->sender.ep;
853
854 /* TODO: look at common code with uct_ud_mlx5_iface_get_av */
855 if (dc_req->sender.global.is_global) {
856 uct_ib_iface_fill_ah_attr_from_gid_lid(ib_iface, dc_req->lid,
857 ucs_unaligned_ptr(&dc_req->sender.global.gid),
858 iface->super.super.super.gid_info.gid_index,
859 0, &ah_attr);
860
861 status = uct_ib_iface_create_ah(ib_iface, &ah_attr, &ah);
862 if (status != UCS_OK) {
863 return status;
864 }
865
866 uct_ib_mlx5_get_av(ah, &mlx5_av);
867 }
868
869 /* Note av initialization is copied from exp verbs */
870 av.stat_rate_sl = ib_iface->config.sl; /* (attr->static_rate << 4) | attr->sl */
871 av.fl_mlid = ib_iface->path_bits[0] & 0x7f;
872
873 /* lid in dc_req is in BE already */
874 if (uct_ib_iface_is_roce(ib_iface)) {
875 av.rlid = htons(UCT_IB_ROCE_UDP_SRC_PORT_BASE);
876 } else {
877 av.rlid = dc_req->lid | htons(ib_iface->path_bits[0]);
878 }
879 av.dqp_dct = htonl(dc_req->dct_num);
880
881 if (!iface->ud_common.config.compact_av || ah_attr.is_global) {
882 av.dqp_dct |= UCT_IB_MLX5_EXTENDED_UD_AV;
883 }
884
885 uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
886 txqp, txwq, MLX5_OPCODE_SEND,
887 &av /*dummy*/, 0, op, sender_ep, 0,
888 0, 0,
889 &av, ah_attr.is_global ? mlx5_av_grh(&mlx5_av) : NULL,
890 uct_ib_mlx5_wqe_av_size(&av), 0, INT_MAX);
891 } else {
892 ucs_assert(op == UCT_RC_EP_FC_FLAG_HARD_REQ);
893 sender.ep = (uint64_t)dc_ep;
894 sender.global.gid = ib_iface->gid_info.gid;
895 sender.global.is_global = dc_ep->flags & UCT_DC_MLX5_EP_FLAG_GRH;
896
897 UCS_STATS_UPDATE_COUNTER(dc_ep->fc.stats,
898 UCT_RC_FC_STAT_TX_HARD_REQ, 1);
899
900 uct_rc_mlx5_txqp_inline_post(&iface->super, UCT_IB_QPT_DCI,
901 txqp, txwq, MLX5_OPCODE_SEND_IMM,
902 &sender.global, sizeof(sender.global), op, sender.ep,
903 iface->rx.dct.qp_num,
904 0, 0,
905 &dc_ep->av,
906 uct_dc_mlx5_ep_get_grh(dc_ep),
907 uct_ib_mlx5_wqe_av_size(&dc_ep->av),
908 MLX5_WQE_CTRL_SOLICITED, INT_MAX);
909 }
910
911 return UCS_OK;
912 }
913
914
UCS_CLASS_INIT_FUNC(uct_dc_mlx5_ep_t,uct_dc_mlx5_iface_t * iface,const uct_dc_mlx5_iface_addr_t * if_addr,uct_ib_mlx5_base_av_t * av)915 UCS_CLASS_INIT_FUNC(uct_dc_mlx5_ep_t, uct_dc_mlx5_iface_t *iface,
916 const uct_dc_mlx5_iface_addr_t *if_addr,
917 uct_ib_mlx5_base_av_t *av)
918 {
919 uint32_t remote_dctn;
920
921 ucs_trace_func("");
922
923 UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super.super.super);
924
925 self->atomic_mr_offset = uct_ib_md_atomic_offset(if_addr->atomic_mr_id);
926 remote_dctn = uct_ib_unpack_uint24(if_addr->qp_num);
927
928 memcpy(&self->av, av, sizeof(*av));
929 self->av.dqp_dct |= htonl(remote_dctn);
930
931 return uct_dc_mlx5_ep_basic_init(iface, self);
932 }
933
UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)934 static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_ep_t)
935 {
936 uct_dc_mlx5_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_dc_mlx5_iface_t);
937
938 uct_dc_mlx5_ep_pending_purge(&self->super.super, NULL, NULL);
939 uct_rc_fc_cleanup(&self->fc);
940
941 ucs_assert_always(self->flags & UCT_DC_MLX5_EP_FLAG_VALID);
942
943 if ((self->dci == UCT_DC_MLX5_EP_NO_DCI) ||
944 uct_dc_mlx5_iface_is_dci_rand(iface)) {
945 return;
946 }
947
948 /* TODO: this is good for dcs policy only.
949 * Need to change if eps share dci
950 */
951 ucs_arbiter_group_cleanup(uct_dc_mlx5_ep_arb_group(iface, self));
952 ucs_assertv_always(uct_dc_mlx5_iface_dci_has_outstanding(iface, self->dci),
953 "iface (%p) ep (%p) dci leak detected: dci=%d", iface,
954 self, self->dci);
955
956 /* we can handle it but well behaving app should not do this */
957 ucs_debug("ep (%p) is destroyed with %d outstanding ops",
958 self, (int16_t)iface->super.super.config.tx_qp_len -
959 uct_rc_txqp_available(&iface->tx.dcis[self->dci].txqp));
960 uct_rc_txqp_purge_outstanding(&iface->tx.dcis[self->dci].txqp, UCS_ERR_CANCELED, 1);
961 iface->tx.dcis[self->dci].ep = NULL;
962 }
963
964 UCS_CLASS_DEFINE(uct_dc_mlx5_ep_t, uct_base_ep_t);
965 UCS_CLASS_DEFINE_NEW_FUNC(uct_dc_mlx5_ep_t, uct_ep_t, uct_dc_mlx5_iface_t *,
966 const uct_dc_mlx5_iface_addr_t *,
967 uct_ib_mlx5_base_av_t *);
968
UCS_CLASS_INIT_FUNC(uct_dc_mlx5_grh_ep_t,uct_dc_mlx5_iface_t * iface,const uct_dc_mlx5_iface_addr_t * if_addr,uct_ib_mlx5_base_av_t * av,struct mlx5_grh_av * grh_av)969 UCS_CLASS_INIT_FUNC(uct_dc_mlx5_grh_ep_t, uct_dc_mlx5_iface_t *iface,
970 const uct_dc_mlx5_iface_addr_t *if_addr,
971 uct_ib_mlx5_base_av_t *av,
972 struct mlx5_grh_av *grh_av)
973 {
974 ucs_trace_func("");
975
976 UCS_CLASS_CALL_SUPER_INIT(uct_dc_mlx5_ep_t, iface, if_addr, av);
977
978 self->super.flags |= UCT_DC_MLX5_EP_FLAG_GRH;
979 memcpy(&self->grh_av, grh_av, sizeof(*grh_av));
980 return UCS_OK;
981 }
982
UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_grh_ep_t)983 UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_grh_ep_t)
984 {
985 ucs_trace_func("");
986 }
987
988 UCS_CLASS_DEFINE(uct_dc_mlx5_grh_ep_t, uct_dc_mlx5_ep_t);
989 UCS_CLASS_DEFINE_NEW_FUNC(uct_dc_mlx5_grh_ep_t, uct_ep_t, uct_dc_mlx5_iface_t *,
990 const uct_dc_mlx5_iface_addr_t *,
991 uct_ib_mlx5_base_av_t *, struct mlx5_grh_av *);
992
uct_dc_mlx5_ep_cleanup(uct_ep_h tl_ep,ucs_class_t * cls)993 void uct_dc_mlx5_ep_cleanup(uct_ep_h tl_ep, ucs_class_t *cls)
994 {
995 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
996 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
997
998 UCS_CLASS_CLEANUP_CALL(cls, ep);
999
1000 if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
1001 ucs_trace("not releasing dc_mlx5_ep %p - waiting for grant", ep);
1002 ep->flags &= ~UCT_DC_MLX5_EP_FLAG_VALID;
1003 /* No need to wait for grant on this ep anymore */
1004 uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
1005 ucs_list_add_tail(&iface->tx.gc_list, &ep->list);
1006 } else {
1007 ucs_free(ep);
1008 }
1009 }
1010
uct_dc_mlx5_ep_release(uct_dc_mlx5_ep_t * ep)1011 void uct_dc_mlx5_ep_release(uct_dc_mlx5_ep_t *ep)
1012 {
1013 ucs_assert_always(!(ep->flags & UCT_DC_MLX5_EP_FLAG_VALID));
1014 ucs_debug("release dc_mlx5_ep %p", ep);
1015 ucs_list_del(&ep->list);
1016 ucs_free(ep);
1017 }
1018
uct_dc_mlx5_ep_pending_common(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep,uct_pending_req_t * r,unsigned flags,int push_to_head)1019 void uct_dc_mlx5_ep_pending_common(uct_dc_mlx5_iface_t *iface,
1020 uct_dc_mlx5_ep_t *ep, uct_pending_req_t *r,
1021 unsigned flags, int push_to_head)
1022 {
1023 int no_dci = (ep->dci == UCT_DC_MLX5_EP_NO_DCI);
1024 ucs_arbiter_group_t *group;
1025
1026 UCS_STATIC_ASSERT(sizeof(uct_dc_mlx5_pending_req_priv) <=
1027 UCT_PENDING_REQ_PRIV_LEN);
1028
1029 if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
1030 uct_dc_mlx5_pending_req_priv(r)->ep = ep;
1031 group = uct_dc_mlx5_ep_rand_arb_group(iface, ep);
1032 } else {
1033 group = &ep->arb_group;
1034 }
1035
1036 if (push_to_head) {
1037 uct_pending_req_arb_group_push_head(no_dci ?
1038 uct_dc_mlx5_iface_dci_waitq(iface) :
1039 uct_dc_mlx5_iface_tx_waitq(iface),
1040 group, r);
1041 } else {
1042 uct_pending_req_arb_group_push(group, r);
1043 }
1044
1045 if (no_dci) {
1046 /* no dci:
1047 * Do not grab dci here. Instead put the group on dci allocation arbiter.
1048 * This way we can assure fairness between all eps waiting for
1049 * dci allocation. Relevant for dcs and dcs_quota policies.
1050 */
1051 uct_dc_mlx5_iface_schedule_dci_alloc(iface, ep);
1052 } else {
1053 uct_dc_mlx5_iface_dci_sched_tx(iface, ep);
1054 }
1055
1056 UCT_TL_EP_STAT_PEND(&ep->super);
1057 }
1058
1059
1060 /* TODO:
1061 currently pending code supports only dcs policy
1062 support hash/random policies
1063 */
uct_dc_mlx5_ep_pending_add(uct_ep_h tl_ep,uct_pending_req_t * r,unsigned flags)1064 ucs_status_t uct_dc_mlx5_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *r,
1065 unsigned flags)
1066 {
1067 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
1068 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
1069
1070 /* ep can tx iff
1071 * - iface has resources: cqe and tx skb
1072 * - dci is either assigned or can be assigned
1073 * - dci has resources
1074 */
1075 if (uct_dc_mlx5_iface_has_tx_resources(iface)) {
1076 if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
1077 if (uct_dc_mlx5_iface_dci_can_alloc(iface) && (ep->fc.fc_wnd > 0)) {
1078 return UCS_ERR_BUSY;
1079 }
1080 } else {
1081 if (uct_dc_mlx5_iface_dci_ep_can_send(ep)) {
1082 return UCS_ERR_BUSY;
1083 }
1084 }
1085 }
1086
1087 uct_dc_mlx5_ep_pending_common(iface, ep, r, flags, 0);
1088
1089 return UCS_OK;
1090 }
1091
1092 /**
1093 * dispatch requests waiting for dci allocation
1094 * Relevant for dcs and dcs_quota policies only.
1095 */
1096 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_pending_wait(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1097 uct_dc_mlx5_iface_dci_do_pending_wait(ucs_arbiter_t *arbiter,
1098 ucs_arbiter_group_t *group,
1099 ucs_arbiter_elem_t *elem,
1100 void *arg)
1101 {
1102 uct_dc_mlx5_ep_t *ep = ucs_container_of(group, uct_dc_mlx5_ep_t, arb_group);
1103 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_dc_mlx5_iface_t);
1104
1105 ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
1106
1107 if (ep->dci != UCT_DC_MLX5_EP_NO_DCI) {
1108 return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
1109 }
1110
1111 if (!uct_dc_mlx5_iface_dci_can_alloc(iface)) {
1112 return UCS_ARBITER_CB_RESULT_STOP;
1113 }
1114 uct_dc_mlx5_iface_dci_alloc(iface, ep);
1115 ucs_assert_always(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
1116 uct_dc_mlx5_iface_dci_sched_tx(iface, ep);
1117 return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
1118 }
1119
1120 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_common_pending_tx(uct_dc_mlx5_ep_t * ep,ucs_arbiter_elem_t * elem)1121 uct_dc_mlx5_iface_dci_do_common_pending_tx(uct_dc_mlx5_ep_t *ep,
1122 ucs_arbiter_elem_t *elem)
1123 {
1124 uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
1125 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1126 uct_dc_mlx5_iface_t);
1127 ucs_status_t status;
1128
1129 if (!uct_dc_mlx5_iface_has_tx_resources(iface)) {
1130 return UCS_ARBITER_CB_RESULT_STOP;
1131 }
1132
1133 ucs_trace_data("progressing pending request %p", req);
1134 status = req->func(req);
1135 ucs_trace_data("status returned from progress pending: %s",
1136 ucs_status_string(status));
1137
1138 if (status == UCS_OK) {
1139 return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
1140 } else if (status == UCS_INPROGRESS) {
1141 return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
1142 }
1143
1144 if (!uct_dc_mlx5_iface_dci_ep_can_send(ep)) {
1145 return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
1146 }
1147
1148 ucs_assertv(!uct_dc_mlx5_iface_has_tx_resources(iface),
1149 "pending callback returned error but send resources are available");
1150 return UCS_ARBITER_CB_RESULT_STOP;
1151 }
1152
1153 /**
1154 * dispatch requests waiting for tx resources (dcs* DCI policies)
1155 */
1156 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1157 uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t *arbiter,
1158 ucs_arbiter_group_t *group,
1159 ucs_arbiter_elem_t *elem,
1160 void *arg)
1161 {
1162
1163 uct_dc_mlx5_ep_t *ep = ucs_container_of(group, uct_dc_mlx5_ep_t,
1164 arb_group);
1165 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1166 uct_dc_mlx5_iface_t);
1167 int is_only = ucs_arbiter_elem_is_only(elem);
1168 ucs_arbiter_cb_result_t res;
1169
1170 res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
1171 if (res == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
1172 /* For dcs* policies release dci if this is the last elem in the group
1173 * and the dci has no outstanding operations. For example pending
1174 * callback did not send anything. (uct_ep_flush or just return ok)
1175 */
1176 if (is_only) {
1177 uct_dc_mlx5_iface_dci_free(iface, ep);
1178 }
1179 }
1180
1181 return res;
1182 }
1183
1184 /**
1185 * dispatch requests waiting for tx resources (rand DCI policy)
1186 */
1187 ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_dci_do_rand_pending_tx(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1188 uct_dc_mlx5_iface_dci_do_rand_pending_tx(ucs_arbiter_t *arbiter,
1189 ucs_arbiter_group_t *group,
1190 ucs_arbiter_elem_t *elem,
1191 void *arg)
1192 {
1193 uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
1194 uct_dc_mlx5_ep_t *ep = uct_dc_mlx5_pending_req_priv(req)->ep;
1195 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1196 uct_dc_mlx5_iface_t);
1197 ucs_arbiter_cb_result_t res;
1198
1199 res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
1200 if ((res == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) &&
1201 uct_rc_fc_has_resources(&iface->super.super, &ep->fc)) {
1202 /* We can't desched group with rand policy if non FC resources are
1203 * missing, since it's never scheduled again. */
1204 res = UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
1205 }
1206
1207 return res;
1208 }
1209
1210 static ucs_arbiter_cb_result_t
uct_dc_mlx5_ep_abriter_purge_cb(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)1211 uct_dc_mlx5_ep_abriter_purge_cb(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group,
1212 ucs_arbiter_elem_t *elem, void *arg)
1213 {
1214 uct_purge_cb_args_t *cb_args = arg;
1215 void **priv_args = cb_args->arg;
1216 uct_dc_mlx5_ep_t *ep = priv_args[0];
1217 uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
1218 uct_dc_mlx5_iface_t);
1219 uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
1220 uct_rc_fc_request_t *freq;
1221
1222 if (uct_dc_mlx5_iface_is_dci_rand(iface) &&
1223 (uct_dc_mlx5_pending_req_priv(req)->ep != ep)) {
1224 /* element belongs to another ep - do not remove it */
1225 return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
1226 }
1227
1228 if (ucs_likely(req->func != uct_dc_mlx5_iface_fc_grant)){
1229 if (cb_args->cb != NULL) {
1230 cb_args->cb(req, priv_args[1]);
1231 } else {
1232 ucs_debug("ep=%p cancelling user pending request %p", ep, req);
1233 }
1234 } else {
1235 /* User callback should not be called for FC messages.
1236 * Just return pending request memory to the pool */
1237 freq = ucs_derived_of(req, uct_rc_fc_request_t);
1238 ucs_mpool_put(freq);
1239 }
1240
1241 return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
1242 }
1243
uct_dc_mlx5_ep_pending_purge(uct_ep_h tl_ep,uct_pending_purge_callback_t cb,void * arg)1244 void uct_dc_mlx5_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb, void *arg)
1245 {
1246 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_dc_mlx5_iface_t);
1247 uct_dc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_dc_mlx5_ep_t);
1248 void *priv_args[2] = {ep, arg};
1249 uct_purge_cb_args_t args = {cb, priv_args};
1250
1251 if (uct_dc_mlx5_iface_is_dci_rand(iface)) {
1252 ucs_arbiter_group_purge(uct_dc_mlx5_iface_tx_waitq(iface),
1253 uct_dc_mlx5_ep_rand_arb_group(iface, ep),
1254 uct_dc_mlx5_ep_abriter_purge_cb, &args);
1255 return;
1256 }
1257
1258 if (ep->dci == UCT_DC_MLX5_EP_NO_DCI) {
1259 ucs_arbiter_group_purge(uct_dc_mlx5_iface_dci_waitq(iface), &ep->arb_group,
1260 uct_dc_mlx5_ep_abriter_purge_cb, &args);
1261 } else {
1262 ucs_arbiter_group_purge(uct_dc_mlx5_iface_tx_waitq(iface), &ep->arb_group,
1263 uct_dc_mlx5_ep_abriter_purge_cb, &args);
1264 uct_dc_mlx5_iface_dci_free(iface, ep);
1265 }
1266 }
1267
uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t * iface,uct_dc_mlx5_ep_t * ep)1268 ucs_status_t uct_dc_mlx5_ep_check_fc(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
1269 {
1270 ucs_status_t status;
1271
1272 if (iface->super.super.config.fc_enabled) {
1273 UCT_RC_CHECK_FC_WND(&ep->fc, ep->super.stats);
1274 if ((ep->fc.fc_wnd == iface->super.super.config.fc_hard_thresh) &&
1275 !uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
1276 status = uct_rc_fc_ctrl(&ep->super.super,
1277 UCT_RC_EP_FC_FLAG_HARD_REQ,
1278 NULL);
1279 if (status != UCS_OK) {
1280 return status;
1281 }
1282 ep->fc.flags |= UCT_DC_MLX5_EP_FC_FLAG_WAIT_FOR_GRANT;
1283 ++iface->tx.fc_grants;
1284 }
1285 } else {
1286 /* Set fc_wnd to max, to send as much as possible without checks */
1287 ep->fc.fc_wnd = INT16_MAX;
1288 }
1289 return UCS_OK;
1290 }
1291
uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t * ep,void * arg,ucs_status_t ep_status)1292 void uct_dc_mlx5_ep_handle_failure(uct_dc_mlx5_ep_t *ep, void *arg,
1293 ucs_status_t ep_status)
1294 {
1295 uct_iface_h tl_iface = ep->super.super.iface;
1296 uint8_t dci = ep->dci;
1297 uct_ib_iface_t *ib_iface = ucs_derived_of(tl_iface, uct_ib_iface_t);
1298 uct_dc_mlx5_iface_t *iface = ucs_derived_of(tl_iface, uct_dc_mlx5_iface_t);
1299 uct_rc_txqp_t *txqp = &iface->tx.dcis[dci].txqp;
1300 uct_ib_mlx5_txwq_t *txwq = &iface->tx.dcis[dci].txwq;
1301 int16_t outstanding;
1302 ucs_status_t status;
1303
1304 ucs_assert(!uct_dc_mlx5_iface_is_dci_rand(iface));
1305
1306 uct_rc_txqp_purge_outstanding(txqp, ep_status, 0);
1307
1308 /* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble
1309 is not updated for the error cqe and all outstanding wqes*/
1310 outstanding = (int16_t)iface->super.super.config.tx_qp_len -
1311 uct_rc_txqp_available(txqp);
1312 iface->super.super.tx.cq_available += outstanding;
1313 uct_rc_txqp_available_set(txqp, (int16_t)iface->super.super.config.tx_qp_len);
1314
1315 /* since we removed all outstanding ops on the dci, it should be released */
1316 ucs_assert(ep->dci != UCT_DC_MLX5_EP_NO_DCI);
1317 uct_dc_mlx5_iface_dci_put(iface, dci);
1318 ucs_assert_always(ep->dci == UCT_DC_MLX5_EP_NO_DCI);
1319
1320 if (uct_dc_mlx5_ep_fc_wait_for_grant(ep)) {
1321 /* No need to wait for grant on this ep anymore */
1322 uct_dc_mlx5_ep_clear_fc_grant_flag(iface, ep);
1323 }
1324
1325 if (ep == iface->tx.fc_ep) {
1326 ucs_assert(ep_status != UCS_ERR_CANCELED);
1327 /* Cannot handle errors on flow-control endpoint.
1328 * Or shall we ignore them?
1329 */
1330 ucs_debug("got error on DC flow-control endpoint, iface %p: %s", iface,
1331 ucs_status_string(ep_status));
1332 } else {
1333 status = ib_iface->ops->set_ep_failed(ib_iface, &ep->super.super,
1334 ep_status);
1335 if (status != UCS_OK) {
1336 uct_ib_mlx5_completion_with_err(ib_iface, arg,
1337 &iface->tx.dcis[dci].txwq,
1338 UCS_LOG_LEVEL_FATAL);
1339 return;
1340 }
1341 }
1342
1343 if (ep_status != UCS_ERR_CANCELED) {
1344 uct_ib_mlx5_completion_with_err(ib_iface, arg, &iface->tx.dcis[dci].txwq,
1345 ib_iface->super.config.failure_level);
1346 }
1347
1348 status = uct_dc_mlx5_iface_reset_dci(iface, &iface->tx.dcis[dci]);
1349 if (status != UCS_OK) {
1350 ucs_fatal("iface %p failed to reset dci[%d] qpn 0x%x: %s",
1351 iface, dci, txwq->super.qp_num, ucs_status_string(status));
1352 }
1353
1354 status = uct_dc_mlx5_iface_dci_connect(iface, &iface->tx.dcis[dci]);
1355 if (status != UCS_OK) {
1356 ucs_fatal("iface %p failed to connect dci[%d] qpn 0x%x: %s",
1357 iface, dci, txwq->super.qp_num, ucs_status_string(status));
1358 }
1359 }
1360