1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "rc_mlx5.h"
12 #if HAVE_DECL_IBV_CMD_MODIFY_QP
13 #  include <infiniband/driver.h>
14 #endif
15 
16 #include <uct/ib/mlx5/ib_mlx5_log.h>
17 #include <uct/ib/mlx5/exp/ib_exp.h>
18 #include <ucs/arch/cpu.h>
19 #include <ucs/sys/compiler.h>
20 #include <arpa/inet.h> /* For htonl */
21 
22 #include "rc_mlx5.inl"
23 
24 
25 /*
26  *
27  * Helper function for buffer-copy post.
28  * Adds the descriptor to the callback queue.
29  */
30 static UCS_F_ALWAYS_INLINE void
uct_rc_mlx5_txqp_bcopy_post(uct_rc_mlx5_iface_common_t * iface,uct_rc_txqp_t * txqp,uct_ib_mlx5_txwq_t * txwq,unsigned opcode,unsigned length,uint64_t rdma_raddr,uct_rkey_t rdma_rkey,uint8_t fm_ce_se,uint32_t imm_val_be,uct_rc_iface_send_desc_t * desc,const void * buffer,uct_ib_log_sge_t * log_sge)31 uct_rc_mlx5_txqp_bcopy_post(uct_rc_mlx5_iface_common_t *iface,
32                             uct_rc_txqp_t *txqp, uct_ib_mlx5_txwq_t *txwq,
33                             unsigned opcode, unsigned length,
34                             /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
35                             uint8_t fm_ce_se, uint32_t imm_val_be,
36                             uct_rc_iface_send_desc_t *desc, const void *buffer,
37                             uct_ib_log_sge_t *log_sge)
38 {
39     desc->super.sn = txwq->sw_pi;
40     uct_rc_mlx5_txqp_dptr_post(iface, IBV_QPT_RC, txqp, txwq,
41                                opcode, buffer, length, &desc->lkey,
42                                rdma_raddr, rdma_rkey, 0, 0, 0, 0,
43                                NULL, NULL, 0, fm_ce_se, imm_val_be, INT_MAX, log_sge);
44     uct_rc_txqp_add_send_op(txqp, &desc->super);
45 }
46 
47 /*
48  * Helper function for zero-copy post.
49  * Adds user completion to the callback queue.
50  */
51 static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_mlx5_ep_zcopy_post(uct_rc_mlx5_ep_t * ep,unsigned opcode,const uct_iov_t * iov,size_t iovcnt,size_t iov_total_length,uint8_t am_id,const void * am_hdr,unsigned am_hdr_len,uint64_t rdma_raddr,uct_rkey_t rdma_rkey,uct_tag_t tag,uint32_t app_ctx,uint32_t ib_imm_be,int force_sig,uct_rc_send_handler_t handler,uct_completion_t * comp)52 uct_rc_mlx5_ep_zcopy_post(uct_rc_mlx5_ep_t *ep, unsigned opcode,
53                           const uct_iov_t *iov, size_t iovcnt, size_t iov_total_length,
54                           /* SEND */ uint8_t am_id, const void *am_hdr, unsigned am_hdr_len,
55                           /* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
56                           /* TAG  */ uct_tag_t tag, uint32_t app_ctx, uint32_t ib_imm_be,
57                           int force_sig, uct_rc_send_handler_t handler,
58                           uct_completion_t *comp)
59 {
60     uct_rc_mlx5_iface_common_t *iface  = ucs_derived_of(ep->super.super.super.iface,
61                                                         uct_rc_mlx5_iface_common_t);
62     uint16_t sn;
63 
64     UCT_RC_CHECK_RES(&iface->super, &ep->super);
65 
66     sn = ep->tx.wq.sw_pi;
67     uct_rc_mlx5_txqp_dptr_post_iov(iface, IBV_QPT_RC,
68                                    &ep->super.txqp, &ep->tx.wq,
69                                    opcode, iov, iovcnt,
70                                    am_id, am_hdr, am_hdr_len,
71                                    rdma_raddr, uct_ib_md_direct_rkey(rdma_rkey),
72                                    tag, app_ctx, ib_imm_be,
73                                    NULL, NULL, 0,
74                                    (comp == NULL) ? force_sig : MLX5_WQE_CTRL_CQ_UPDATE,
75                                    UCT_IB_MAX_ZCOPY_LOG_SGE(&iface->super.super));
76 
77     uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, handler, comp, sn,
78                               UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY, iov_total_length);
79 
80     return UCS_INPROGRESS;
81 }
82 
83 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_put_short_inline(uct_ep_h tl_ep,const void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey)84 uct_rc_mlx5_ep_put_short_inline(uct_ep_h tl_ep, const void *buffer, unsigned length,
85                                 uint64_t remote_addr, uct_rkey_t rkey)
86 {
87     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
88     UCT_RC_MLX5_CHECK_PUT_SHORT(length, 0);
89     UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
90 
91     uct_rc_mlx5_ep_fence_put(iface, &ep->tx.wq, &rkey, &remote_addr,
92                              ep->super.atomic_mr_offset);
93     uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
94                                  &ep->super.txqp, &ep->tx.wq,
95                                  MLX5_OPCODE_RDMA_WRITE,
96                                  buffer, length, 0, 0, 0, remote_addr, rkey,
97                                  NULL, NULL, 0, 0, INT_MAX);
98     UCT_TL_EP_STAT_OP(&ep->super.super, PUT, SHORT, length);
99     return UCS_OK;
100 }
101 
102 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_am_short_inline(uct_ep_h tl_ep,uint8_t id,uint64_t hdr,const void * payload,unsigned length)103 uct_rc_mlx5_ep_am_short_inline(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
104                                const void *payload, unsigned length)
105 {
106     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
107     UCT_RC_MLX5_CHECK_AM_SHORT(id, length, 0);
108     UCT_RC_CHECK_RES(&iface->super, &ep->super);
109     UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
110 
111     uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
112                                  &ep->super.txqp, &ep->tx.wq,
113                                  MLX5_OPCODE_SEND,
114                                  payload, length,
115                                  id, hdr, 0,
116                                  0, 0,
117                                  NULL, NULL, 0,
118                                  MLX5_WQE_CTRL_SOLICITED,
119                                  INT_MAX);
120     UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(hdr) + length);
121     UCT_RC_UPDATE_FC(&iface->super, &ep->super, id);
122     return UCS_OK;
123 }
124 
125 #if HAVE_IBV_DM
126 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_short_dm(uct_rc_mlx5_ep_t * ep,uct_rc_mlx5_dm_copy_data_t * cache,size_t hdr_len,const void * payload,unsigned length,unsigned opcode,uint8_t fm_ce_se,uint64_t rdma_raddr,uct_rkey_t rdma_rkey)127 uct_rc_mlx5_ep_short_dm(uct_rc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache,
128                         size_t hdr_len, const void *payload, unsigned length,
129                         unsigned opcode, uint8_t fm_ce_se,
130                         uint64_t rdma_raddr, uct_rkey_t rdma_rkey)
131 {
132     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(ep->super.super.super.iface,
133                                                        uct_rc_mlx5_iface_common_t);
134     uct_rc_iface_send_desc_t *desc    = NULL;
135     void *buffer;
136     ucs_status_t status;
137     uct_ib_log_sge_t log_sge;
138 
139     status = uct_rc_mlx5_common_dm_make_data(iface, cache, hdr_len, payload,
140                                              length, &desc, &buffer, &log_sge);
141     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
142         return status;
143     }
144 
145     uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
146                                 opcode, hdr_len + length,
147                                 rdma_raddr, rdma_rkey, fm_ce_se,
148                                 0, desc, buffer,
149                                 log_sge.num_sge ? &log_sge : NULL);
150     return UCS_OK;
151 }
152 #endif
153 
154 ucs_status_t
uct_rc_mlx5_ep_put_short(uct_ep_h tl_ep,const void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey)155 uct_rc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *buffer, unsigned length,
156                          uint64_t remote_addr, uct_rkey_t rkey)
157 {
158 #if HAVE_IBV_DM
159     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_mlx5_iface_common_t);
160     uct_rc_iface_t *rc_iface          = &iface->super;
161     uct_rc_mlx5_ep_t *ep              = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);
162     ucs_status_t status;
163 
164     if (ucs_likely((length <= UCT_IB_MLX5_PUT_MAX_SHORT(0)) || !iface->dm.dm)) {
165 #endif
166         return uct_rc_mlx5_ep_put_short_inline(tl_ep, buffer, length, remote_addr, rkey);
167 #if HAVE_IBV_DM
168     }
169 
170     UCT_CHECK_LENGTH(length, 0, iface->dm.seg_len, "put_short");
171     UCT_RC_CHECK_RMA_RES(rc_iface, &ep->super);
172     uct_rc_mlx5_ep_fence_put(iface, &ep->tx.wq, &rkey, &remote_addr,
173                              ep->super.atomic_mr_offset);
174     status = uct_rc_mlx5_ep_short_dm(ep, NULL, 0, buffer, length,
175                                      MLX5_OPCODE_RDMA_WRITE,
176                                      MLX5_WQE_CTRL_CQ_UPDATE,
177                                      remote_addr, rkey);
178     if (UCS_STATUS_IS_ERR(status)) {
179         return status;
180     }
181 
182     UCT_TL_EP_STAT_OP(&ep->super.super, PUT, SHORT, length);
183     return UCS_OK;
184 #endif
185 }
186 
uct_rc_mlx5_ep_put_bcopy(uct_ep_h tl_ep,uct_pack_callback_t pack_cb,void * arg,uint64_t remote_addr,uct_rkey_t rkey)187 ssize_t uct_rc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb,
188                                  void *arg, uint64_t remote_addr, uct_rkey_t rkey)
189 {
190     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
191     uct_rc_iface_send_desc_t *desc;
192     size_t length;
193 
194     UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
195     UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(&iface->super, &iface->super.tx.mp,
196                                        desc, pack_cb, arg, length);
197     uct_rc_mlx5_ep_fence_put(iface, &ep->tx.wq, &rkey, &remote_addr,
198                              ep->super.atomic_mr_offset);
199 
200     uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
201                                 MLX5_OPCODE_RDMA_WRITE, length, remote_addr,
202                                 rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1,
203                                 NULL);
204     UCT_TL_EP_STAT_OP(&ep->super.super, PUT, BCOPY, length);
205     return length;
206 }
207 
uct_rc_mlx5_ep_put_zcopy(uct_ep_h tl_ep,const uct_iov_t * iov,size_t iovcnt,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)208 ucs_status_t uct_rc_mlx5_ep_put_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov, size_t iovcnt,
209                                       uint64_t remote_addr, uct_rkey_t rkey,
210                                       uct_completion_t *comp)
211 {
212     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
213     ucs_status_t status;
214 
215     UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_RMA_MAX_IOV(0),
216                        "uct_rc_mlx5_ep_put_zcopy");
217     UCT_CHECK_LENGTH(uct_iov_total_length(iov, iovcnt), 0, UCT_IB_MAX_MESSAGE_SIZE,
218                      "put_zcopy");
219     UCT_RC_CHECK_NUM_RDMA_READ(&iface->super);
220     uct_rc_mlx5_ep_fence_put(iface, &ep->tx.wq, &rkey, &remote_addr,
221                              ep->super.atomic_mr_offset);
222 
223     status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_RDMA_WRITE, iov, iovcnt,
224                                        0ul, 0, NULL, 0, remote_addr, rkey,
225                                        0ul, 0, 0, MLX5_WQE_CTRL_CQ_UPDATE,
226                                        uct_rc_ep_send_op_completion_handler,
227                                        comp);
228     UCT_TL_EP_STAT_OP_IF_SUCCESS(status, &ep->super.super, PUT, ZCOPY,
229                                  uct_iov_total_length(iov, iovcnt));
230     return status;
231 }
232 
uct_rc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,uct_unpack_callback_t unpack_cb,void * arg,size_t length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)233 ucs_status_t uct_rc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,
234                                       uct_unpack_callback_t unpack_cb,
235                                       void *arg, size_t length,
236                                       uint64_t remote_addr, uct_rkey_t rkey,
237                                       uct_completion_t *comp)
238 {
239     uint8_t fm_ce_se = MLX5_WQE_CTRL_CQ_UPDATE;
240     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
241     uct_rc_iface_send_desc_t *desc;
242 
243     UCT_CHECK_LENGTH(length, 0, iface->super.super.config.seg_size, "get_bcopy");
244     UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
245     UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(&iface->super, &iface->super.tx.mp, desc,
246                                        unpack_cb, comp, arg, length);
247 
248     uct_rc_mlx5_ep_fence_get(iface, &ep->tx.wq, &rkey, &fm_ce_se);
249     uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
250                                 MLX5_OPCODE_RDMA_READ, length, remote_addr,
251                                 rkey, fm_ce_se, 0, desc, desc + 1, NULL);
252     UCT_TL_EP_STAT_OP(&ep->super.super, GET, BCOPY, length);
253     UCT_RC_RDMA_READ_POSTED(&iface->super, length);
254     return UCS_INPROGRESS;
255 }
256 
uct_rc_mlx5_ep_get_zcopy(uct_ep_h tl_ep,const uct_iov_t * iov,size_t iovcnt,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)257 ucs_status_t uct_rc_mlx5_ep_get_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov, size_t iovcnt,
258                                       uint64_t remote_addr, uct_rkey_t rkey,
259                                       uct_completion_t *comp)
260 {
261     uint8_t fm_ce_se    = MLX5_WQE_CTRL_CQ_UPDATE;
262     size_t total_length = uct_iov_total_length(iov, iovcnt);
263     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
264     ucs_status_t status;
265 
266     UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_RMA_MAX_IOV(0),
267                        "uct_rc_mlx5_ep_get_zcopy");
268     UCT_CHECK_LENGTH(total_length,
269                      iface->super.super.config.max_inl_cqe[UCT_IB_DIR_TX] + 1,
270                      iface->super.config.max_get_zcopy, "get_zcopy");
271     UCT_RC_CHECK_NUM_RDMA_READ(&iface->super);
272 
273     uct_rc_mlx5_ep_fence_get(iface, &ep->tx.wq, &rkey, &fm_ce_se);
274     status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_RDMA_READ, iov, iovcnt,
275                                        total_length, 0, NULL, 0, remote_addr, rkey,
276                                        0ul, 0, 0, fm_ce_se,
277                                        uct_rc_ep_get_zcopy_completion_handler,
278                                        comp);
279     if (!UCS_STATUS_IS_ERR(status)) {
280         UCT_TL_EP_STAT_OP(&ep->super.super, GET, ZCOPY, total_length);
281         UCT_RC_RDMA_READ_POSTED(&iface->super, total_length);
282     }
283     return status;
284 }
285 
286 ucs_status_t
uct_rc_mlx5_ep_am_short(uct_ep_h tl_ep,uint8_t id,uint64_t hdr,const void * payload,unsigned length)287 uct_rc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
288                         const void *payload, unsigned length)
289 {
290 #if HAVE_IBV_DM
291     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_mlx5_iface_common_t);
292     uct_rc_iface_t *rc_iface          = &iface->super;
293     uct_rc_mlx5_ep_t *ep              = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);
294     ucs_status_t status;
295     uct_rc_mlx5_dm_copy_data_t cache;
296 
297     if (ucs_likely((sizeof(uct_rc_mlx5_am_short_hdr_t) + length <= UCT_IB_MLX5_AM_MAX_SHORT(0)) ||
298                    !iface->dm.dm)) {
299 #endif
300         return uct_rc_mlx5_ep_am_short_inline(tl_ep, id, hdr, payload, length);
301 #if HAVE_IBV_DM
302     }
303 
304     UCT_CHECK_LENGTH(length + sizeof(uct_rc_mlx5_am_short_hdr_t), 0,
305                      iface->dm.seg_len, "am_short");
306     UCT_CHECK_AM_ID(id);
307     UCT_RC_CHECK_RES(&iface->super, &ep->super);
308     UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
309 
310     uct_rc_mlx5_am_hdr_fill(&cache.am_hdr.rc_hdr, id);
311     cache.am_hdr.am_hdr = hdr;
312 
313     status = uct_rc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.am_hdr), payload, length,
314                                      MLX5_OPCODE_SEND,
315                                      MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
316                                      0, 0);
317     if (UCS_STATUS_IS_ERR(status)) {
318         return status;
319     }
320 
321     UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(cache.am_hdr) + length);
322     UCT_RC_UPDATE_FC(rc_iface, &ep->super, id);
323     return UCS_OK;
324 #endif
325 }
326 
uct_rc_mlx5_ep_am_bcopy(uct_ep_h tl_ep,uint8_t id,uct_pack_callback_t pack_cb,void * arg,unsigned flags)327 ssize_t uct_rc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id,
328                                 uct_pack_callback_t pack_cb, void *arg,
329                                 unsigned flags)
330 {
331     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
332     uct_rc_iface_send_desc_t *desc;
333     size_t length;
334 
335     UCT_CHECK_AM_ID(id);
336     UCT_RC_CHECK_RES(&iface->super, &ep->super);
337     UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
338     UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(&iface->super, &iface->super.tx.mp, desc,
339                                       id, uct_rc_mlx5_am_hdr_fill, uct_rc_mlx5_hdr_t,
340                                       pack_cb, arg, &length);
341 
342     uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
343                                 MLX5_OPCODE_SEND, sizeof(uct_rc_mlx5_hdr_t) + length,
344                                 0, 0, MLX5_WQE_CTRL_SOLICITED, 0, desc, desc + 1,
345                                 NULL);
346     UCT_TL_EP_STAT_OP(&ep->super.super, AM, BCOPY, length);
347     UCT_RC_UPDATE_FC(&iface->super, &ep->super, id);
348     return length;
349 }
350 
uct_rc_mlx5_ep_am_zcopy(uct_ep_h tl_ep,uint8_t id,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)351 ucs_status_t uct_rc_mlx5_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *header,
352                                      unsigned header_length, const uct_iov_t *iov,
353                                      size_t iovcnt, unsigned flags,
354                                      uct_completion_t *comp)
355 {
356     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
357     ucs_status_t status;
358 
359     UCT_CHECK_IOV_SIZE(iovcnt, UCT_IB_MLX5_AM_ZCOPY_MAX_IOV,
360                        "uct_rc_mlx5_ep_am_zcopy");
361     UCT_RC_MLX5_CHECK_AM_ZCOPY(id, header_length, uct_iov_total_length(iov, iovcnt),
362                                iface->super.super.config.seg_size, 0);
363     UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
364 
365     status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_SEND, iov, iovcnt, 0ul,
366                                        id, header, header_length, 0, 0, 0ul, 0, 0,
367                                        MLX5_WQE_CTRL_SOLICITED,
368                                        uct_rc_ep_send_op_completion_handler,
369                                        comp);
370     if (ucs_likely(status >= 0)) {
371         UCT_TL_EP_STAT_OP(&ep->super.super, AM, ZCOPY,
372                           header_length + uct_iov_total_length(iov, iovcnt));
373         UCT_RC_UPDATE_FC(&iface->super, &ep->super, id);
374     }
375     return status;
376 }
377 
378 static UCS_F_ALWAYS_INLINE void
uct_rc_mlx5_ep_atomic_post(uct_ep_h tl_ep,unsigned opcode,uct_rc_iface_send_desc_t * desc,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uint64_t compare_mask,uint64_t compare,uint64_t swap_mask,uint64_t swap_add)379 uct_rc_mlx5_ep_atomic_post(uct_ep_h tl_ep, unsigned opcode,
380                            uct_rc_iface_send_desc_t *desc, unsigned length,
381                            uint64_t remote_addr, uct_rkey_t rkey,
382                            uint64_t compare_mask, uint64_t compare,
383                            uint64_t swap_mask, uint64_t swap_add)
384 {
385     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
386     uint32_t ib_rkey = uct_ib_resolve_atomic_rkey(rkey, ep->super.atomic_mr_offset,
387                                                   &remote_addr);
388 
389     desc->super.sn = ep->tx.wq.sw_pi;
390     uct_rc_mlx5_txqp_dptr_post(iface, IBV_QPT_RC,
391                                &ep->super.txqp, &ep->tx.wq,
392                                opcode, desc + 1, length, &desc->lkey,
393                                remote_addr, ib_rkey,
394                                compare_mask, compare, swap_mask, swap_add,
395                                NULL, NULL, 0, MLX5_WQE_CTRL_CQ_UPDATE,
396                                0, INT_MAX, NULL);
397 
398     UCT_TL_EP_STAT_ATOMIC(&ep->super.super);
399     uct_rc_txqp_add_send_op(&ep->super.txqp, &desc->super);
400 }
401 
402 static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_mlx5_ep_atomic_fop(uct_ep_h tl_ep,int opcode,void * result,int ext,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uint64_t compare_mask,uint64_t compare,uint64_t swap_mask,uint64_t swap_add,uct_completion_t * comp)403 uct_rc_mlx5_ep_atomic_fop(uct_ep_h tl_ep, int opcode, void *result, int ext,
404                           unsigned length, uint64_t remote_addr, uct_rkey_t rkey,
405                           uint64_t compare_mask, uint64_t compare,
406                           uint64_t swap_mask, uint64_t swap_add,
407                           uct_completion_t *comp)
408 {
409     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
410     uct_rc_iface_send_desc_t *desc;
411 
412     UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
413     UCT_RC_IFACE_GET_TX_ATOMIC_FETCH_DESC(&iface->super,
414                                           &iface->tx.atomic_desc_mp, desc,
415                                           uct_rc_iface_atomic_handler(&iface->super,
416                                                                       ext, length),
417                                           result, comp);
418     uct_rc_mlx5_ep_atomic_post(tl_ep, opcode, desc, length, remote_addr, rkey,
419                                compare_mask, compare, swap_mask, swap_add);
420     return UCS_INPROGRESS;
421 }
422 
423 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep,unsigned opcode,unsigned size,uint64_t value,uint64_t remote_addr,uct_rkey_t rkey)424 uct_rc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
425                               uint64_t value, uint64_t remote_addr,
426                               uct_rkey_t rkey)
427 {
428     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
429     uct_rc_iface_send_desc_t *desc;
430     int op;
431     uint64_t compare_mask;
432     uint64_t compare;
433     uint64_t swap_mask;
434     uint64_t swap;
435     int      ext; /* not used here */
436     ucs_status_t status;
437 
438     UCT_RC_CHECK_RMA_RES(&iface->super, &ep->super);
439     UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_OPS);
440 
441     status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op,
442                                                   &compare_mask, &compare,
443                                                   &swap_mask, &swap, &ext);
444     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
445         return status;
446     }
447 
448     UCT_RC_IFACE_GET_TX_ATOMIC_DESC(&iface->super, &iface->tx.atomic_desc_mp,
449                                     desc);
450 
451     uct_rc_mlx5_ep_atomic_post(tl_ep, op, desc, size, remote_addr, rkey,
452                                compare_mask, compare, swap_mask, swap);
453     return UCS_OK;
454 }
455 
456 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep,unsigned opcode,unsigned size,uint64_t value,void * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)457 uct_rc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
458                                uint64_t value, void *result,
459                                uint64_t remote_addr, uct_rkey_t rkey,
460                                uct_completion_t *comp)
461 {
462     int op;
463     uint64_t compare_mask;
464     uint64_t compare;
465     uint64_t swap_mask;
466     uint64_t swap;
467     int      ext;
468     ucs_status_t status;
469 
470     UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_FOPS);
471 
472     status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
473                                                   &compare, &swap_mask, &swap, &ext);
474     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
475         return status;
476     }
477 
478     return uct_rc_mlx5_ep_atomic_fop(tl_ep, op, result, ext, size, remote_addr, rkey,
479                                      compare_mask, compare, swap_mask, swap, comp);
480 }
481 
uct_rc_mlx5_ep_atomic32_post(uct_ep_h ep,unsigned opcode,uint32_t value,uint64_t remote_addr,uct_rkey_t rkey)482 ucs_status_t uct_rc_mlx5_ep_atomic32_post(uct_ep_h ep, unsigned opcode, uint32_t value,
483                                           uint64_t remote_addr, uct_rkey_t rkey)
484 {
485     return uct_rc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
486 }
487 
uct_rc_mlx5_ep_atomic64_post(uct_ep_h ep,unsigned opcode,uint64_t value,uint64_t remote_addr,uct_rkey_t rkey)488 ucs_status_t uct_rc_mlx5_ep_atomic64_post(uct_ep_h ep, unsigned opcode, uint64_t value,
489                                           uint64_t remote_addr, uct_rkey_t rkey)
490 {
491     return uct_rc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
492 }
493 
uct_rc_mlx5_ep_atomic64_fetch(uct_ep_h ep,uct_atomic_op_t opcode,uint64_t value,uint64_t * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)494 ucs_status_t uct_rc_mlx5_ep_atomic64_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
495                                            uint64_t value, uint64_t *result,
496                                            uint64_t remote_addr, uct_rkey_t rkey,
497                                            uct_completion_t *comp)
498 {
499     return uct_rc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
500                                           remote_addr, rkey, comp);
501 }
502 
uct_rc_mlx5_ep_atomic32_fetch(uct_ep_h ep,uct_atomic_op_t opcode,uint32_t value,uint32_t * result,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)503 ucs_status_t uct_rc_mlx5_ep_atomic32_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
504                                            uint32_t value, uint32_t *result,
505                                            uint64_t remote_addr, uct_rkey_t rkey,
506                                            uct_completion_t *comp)
507 {
508     return uct_rc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
509                                           remote_addr, rkey, comp);
510 }
511 
uct_rc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep,uint64_t compare,uint64_t swap,uint64_t remote_addr,uct_rkey_t rkey,uint64_t * result,uct_completion_t * comp)512 ucs_status_t uct_rc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep, uint64_t compare, uint64_t swap,
513                                            uint64_t remote_addr, uct_rkey_t rkey,
514                                            uint64_t *result, uct_completion_t *comp)
515 {
516     return uct_rc_mlx5_ep_atomic_fop(tl_ep, MLX5_OPCODE_ATOMIC_CS, result, 0, sizeof(uint64_t),
517                                      remote_addr, rkey, 0, htobe64(compare),
518                                      UINT64_MAX, htobe64(swap), comp);
519 }
520 
uct_rc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep,uint32_t compare,uint32_t swap,uint64_t remote_addr,uct_rkey_t rkey,uint32_t * result,uct_completion_t * comp)521 ucs_status_t uct_rc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep, uint32_t compare, uint32_t swap,
522                                            uint64_t remote_addr, uct_rkey_t rkey,
523                                            uint32_t *result, uct_completion_t *comp)
524 {
525     return uct_rc_mlx5_ep_atomic_fop(tl_ep, MLX5_OPCODE_ATOMIC_MASKED_CS, result, 1,
526                                      sizeof(uint32_t), remote_addr, rkey, UCS_MASK(32),
527                                      htonl(compare), UINT64_MAX, htonl(swap), comp);
528 }
529 
uct_rc_mlx5_ep_fence(uct_ep_h tl_ep,unsigned flags)530 ucs_status_t uct_rc_mlx5_ep_fence(uct_ep_h tl_ep, unsigned flags)
531 {
532     uct_rc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);
533 
534     return uct_rc_ep_fence(tl_ep, &ep->tx.wq.fi, 1);
535 }
536 
uct_rc_mlx5_ep_flush(uct_ep_h tl_ep,unsigned flags,uct_completion_t * comp)537 ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
538                                   uct_completion_t *comp)
539 {
540     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
541     ucs_status_t status;
542     uint16_t sn;
543 
544     if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
545         uct_ep_pending_purge(&ep->super.super.super, NULL, 0);
546         uct_rc_mlx5_ep_handle_failure(ep, UCS_ERR_CANCELED);
547         return UCS_OK;
548     }
549 
550     status = uct_rc_ep_flush(&ep->super, ep->tx.wq.bb_max, flags);
551     if (status != UCS_INPROGRESS) {
552         return status;
553     }
554 
555     if (uct_rc_txqp_unsignaled(&ep->super.txqp) != 0) {
556         sn = ep->tx.wq.sw_pi;
557         UCT_RC_CHECK_RES(&iface->super, &ep->super);
558         uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
559                                      &ep->super.txqp, &ep->tx.wq,
560                                      MLX5_OPCODE_NOP, NULL, 0,
561                                      0, 0, 0,
562                                      0, 0,
563                                      NULL, NULL, 0, 0,
564                                      INT_MAX);
565     } else {
566         sn = ep->tx.wq.sig_pi;
567     }
568 
569     return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super,
570                                       &ep->super.txqp, comp, sn);
571 }
572 
uct_rc_mlx5_ep_fc_ctrl(uct_ep_t * tl_ep,unsigned op,uct_rc_fc_request_t * req)573 ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
574                                     uct_rc_fc_request_t *req)
575 {
576     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
577 
578     /* In RC only PURE grant is sent as a separate message. Other FC
579      * messages are bundled with AM. */
580     ucs_assert(op == UCT_RC_EP_FC_PURE_GRANT);
581 
582     UCT_RC_CHECK_RES(&iface->super, &ep->super);
583     uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
584                                  &ep->super.txqp, &ep->tx.wq,
585                                  MLX5_OPCODE_SEND|UCT_RC_MLX5_OPCODE_FLAG_RAW,
586                                  NULL, 0,
587                                  UCT_RC_EP_FC_PURE_GRANT, 0, 0,
588                                  0, 0,
589                                  NULL, NULL, 0, 0,
590                                  INT_MAX);
591     return UCS_OK;
592 }
593 
uct_rc_mlx5_ep_get_address(uct_ep_h tl_ep,uct_ep_addr_t * addr)594 ucs_status_t uct_rc_mlx5_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
595 {
596     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
597     uct_rc_mlx5_ep_address_t *rc_addr = (uct_rc_mlx5_ep_address_t*)addr;
598     uct_ib_md_t *md                   = uct_ib_iface_md(ucs_derived_of(
599                                         tl_ep->iface, uct_ib_iface_t));
600 
601     uct_ib_pack_uint24(rc_addr->qp_num, ep->tx.wq.super.qp_num);
602     uct_ib_mlx5_md_get_atomic_mr_id(md, &rc_addr->atomic_mr_id);
603 
604     if (UCT_RC_MLX5_TM_ENABLED(iface)) {
605         uct_ib_pack_uint24(rc_addr->tm_qp_num, ep->tm_qp.qp_num);
606     }
607 
608     return UCS_OK;
609 }
610 
uct_rc_mlx5_common_packet_dump(uct_base_iface_t * iface,uct_am_trace_type_t type,void * data,size_t length,size_t valid_length,char * buffer,size_t max)611 void uct_rc_mlx5_common_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
612                                     void *data, size_t length, size_t valid_length,
613                                     char *buffer, size_t max)
614 {
615     uct_rc_mlx5_hdr_t *rch = data;
616 
617 #if IBV_HW_TM
618     if (rch->tmh_opcode != IBV_TMH_NO_TAG) {
619         struct ibv_tmh *tmh = ucs_unaligned_ptr(rch);
620         struct ibv_rvh *rvh = (void*)(tmh + 1);
621         uct_tag_t tag;
622         uint32_t app_ctx;
623 
624         tag     = tmh->tag;
625         app_ctx = tmh->app_ctx;
626 
627         switch (rch->tmh_opcode) {
628         case IBV_TMH_EAGER:
629             snprintf(buffer, max, " EAGER tag %lx app_ctx %d", tag, app_ctx);
630             return;
631         case IBV_TMH_RNDV:
632             snprintf(buffer, max, " RNDV tag %lx app_ctx %d va 0x%lx len %d rkey %x",
633                      tag, app_ctx, be64toh(rvh->va), ntohl(rvh->len), ntohl(rvh->rkey));
634             return;
635         case IBV_TMH_FIN:
636             snprintf(buffer, max, " FIN tag %lx app_ctx %d", tag, app_ctx);
637             return;
638         default:
639             break;
640         }
641     }
642 #endif
643 
644     data = &rch->rc_hdr;
645     /* coverity[overrun-buffer-val] */
646     uct_rc_ep_packet_dump(iface, type, data, length - UCS_PTR_BYTE_DIFF(rch, data),
647                           valid_length, buffer, max);
648 }
649 
650 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_connect_qp(uct_rc_mlx5_iface_common_t * iface,uct_ib_mlx5_qp_t * qp,uint32_t qp_num,struct ibv_ah_attr * ah_attr,enum ibv_mtu path_mtu)651 uct_rc_mlx5_ep_connect_qp(uct_rc_mlx5_iface_common_t *iface,
652                           uct_ib_mlx5_qp_t *qp, uint32_t qp_num,
653                           struct ibv_ah_attr *ah_attr, enum ibv_mtu path_mtu)
654 {
655     uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.md, uct_ib_mlx5_md_t);
656 
657     ucs_assert(path_mtu != UCT_IB_ADDRESS_INVALID_PATH_MTU);
658     if (md->flags & UCT_IB_MLX5_MD_FLAG_DEVX) {
659         return uct_rc_mlx5_iface_common_devx_connect_qp(iface, qp, qp_num,
660                                                         ah_attr, path_mtu);
661     } else {
662         return uct_rc_iface_qp_connect(&iface->super, qp->verbs.qp, qp_num,
663                                        ah_attr, path_mtu);
664     }
665 }
666 
uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep,const uct_device_addr_t * dev_addr,const uct_ep_addr_t * ep_addr)667 ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep,
668                                           const uct_device_addr_t *dev_addr,
669                                           const uct_ep_addr_t *ep_addr)
670 {
671     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
672     const uct_ib_address_t *ib_addr = (const uct_ib_address_t *)dev_addr;
673     const uct_rc_mlx5_ep_address_t *rc_addr = (const uct_rc_mlx5_ep_address_t*)ep_addr;
674     uint32_t qp_num;
675     struct ibv_ah_attr ah_attr;
676     enum ibv_mtu path_mtu;
677     ucs_status_t status;
678 
679     uct_ib_iface_fill_ah_attr_from_addr(&iface->super.super, ib_addr,
680                                         ep->super.path_index, &ah_attr,
681                                         &path_mtu);
682     ucs_assert(path_mtu != UCT_IB_ADDRESS_INVALID_PATH_MTU);
683 
684     if (UCT_RC_MLX5_TM_ENABLED(iface)) {
685         /* For HW TM we need 2 QPs, one of which will be used by the device for
686          * RNDV offload (for issuing RDMA reads and sending RNDV ACK). No WQEs
687          * should be posted to the send side of the QP which is owned by device. */
688         status = uct_rc_mlx5_ep_connect_qp(iface, &ep->tm_qp,
689                                            uct_ib_unpack_uint24(rc_addr->qp_num),
690                                            &ah_attr, path_mtu);
691         if (status != UCS_OK) {
692             return status;
693         }
694 
695         /* Need to connect local ep QP to the one owned by device
696          * (and bound to XRQ) on the peer. */
697         qp_num = uct_ib_unpack_uint24(rc_addr->tm_qp_num);
698     } else {
699         qp_num = uct_ib_unpack_uint24(rc_addr->qp_num);
700     }
701 
702     status = uct_rc_mlx5_ep_connect_qp(iface, &ep->tx.wq.super, qp_num,
703                                        &ah_attr, path_mtu);
704     if (status != UCS_OK) {
705         return status;
706     }
707 
708     ep->super.atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id);
709 
710     return UCS_OK;
711 }
712 
713 #if IBV_HW_TM
714 
uct_rc_mlx5_ep_tag_rndv_cancel(uct_ep_h tl_ep,void * op)715 ucs_status_t uct_rc_mlx5_ep_tag_rndv_cancel(uct_ep_h tl_ep, void *op)
716 {
717     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(tl_ep->iface,
718                                                        uct_rc_mlx5_iface_common_t);
719 
720     uint32_t op_index = (uint32_t)((uint64_t)op);
721     ucs_ptr_array_remove(&iface->tm.rndv_comps, op_index);
722     return UCS_OK;
723 }
724 
725 static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep,uct_tag_t tag,const void * data,size_t length)726 uct_rc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep, uct_tag_t tag,
727                                       const void *data, size_t length)
728 {
729     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
730     UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
731                      UCT_IB_MLX5_AM_MAX_SHORT(0), "tag_short");
732     UCT_RC_CHECK_RES(&iface->super, &ep->super);
733 
734     uct_rc_mlx5_txqp_tag_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
735                                      &ep->tx.wq, MLX5_OPCODE_SEND, data, length,
736                                      NULL, tag, 0, IBV_TMH_EAGER, 0, NULL,
737                                      NULL, 0, NULL, 0, MLX5_WQE_CTRL_SOLICITED);
738 
739     UCT_TL_EP_STAT_OP(&ep->super.super, TAG, SHORT, length);
740 
741     return UCS_OK;
742 }
743 
uct_rc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep,uct_tag_t tag,const void * data,size_t length)744 ucs_status_t uct_rc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag,
745                                             const void *data, size_t length)
746 {
747 #if HAVE_IBV_DM
748     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
749     uct_rc_mlx5_dm_copy_data_t cache;
750     ucs_status_t status;
751 
752     if (ucs_likely((sizeof(struct ibv_tmh) + length <= UCT_IB_MLX5_AM_MAX_SHORT(0)) ||
753                    !iface->dm.dm)) {
754 #endif
755         return uct_rc_mlx5_ep_tag_eager_short_inline(tl_ep, tag, data, length);
756 #if HAVE_IBV_DM
757     }
758 
759     UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
760                      iface->dm.seg_len, "tag_short");
761     UCT_RC_CHECK_RES(&iface->super, &ep->super);
762 
763     uct_rc_mlx5_fill_tmh(ucs_unaligned_ptr(&cache.tm_hdr), tag, 0, IBV_TMH_EAGER);
764 
765     status = uct_rc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.tm_hdr), data, length,
766                                    MLX5_OPCODE_SEND,
767                                    MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
768                                    0, 0);
769     if (!UCS_STATUS_IS_ERR(status)) {
770         UCT_TL_EP_STAT_OP(&ep->super.super, TAG, SHORT, length);
771     }
772 
773     return status;
774 #endif
775 }
776 
uct_rc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep,uct_tag_t tag,uint64_t imm,uct_pack_callback_t pack_cb,void * arg,unsigned flags)777 ssize_t uct_rc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag,
778                                        uint64_t imm,
779                                        uct_pack_callback_t pack_cb,
780                                        void *arg, unsigned flags)
781 {
782     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
783     uct_rc_iface_send_desc_t *desc;
784     uint32_t app_ctx, ib_imm;
785     int opcode;
786     size_t length;
787 
788     UCT_RC_CHECK_RES(&iface->super, &ep->super);
789 
790     UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND,
791                              _IMM);
792 
793     UCT_RC_MLX5_IFACE_GET_TM_BCOPY_DESC(&iface->super, iface->tm.bcopy_mp,
794                                         desc, tag, app_ctx, pack_cb, arg, length);
795 
796     uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
797                                 opcode, sizeof(struct ibv_tmh) + length,
798                                 0, 0, MLX5_WQE_CTRL_SOLICITED, ib_imm,
799                                 desc, desc + 1, NULL);
800 
801     UCT_TL_EP_STAT_OP(&ep->super.super, TAG, BCOPY, length);
802 
803     return length;
804 }
805 
uct_rc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep,uct_tag_t tag,uint64_t imm,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)806 ucs_status_t uct_rc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
807                                             uint64_t imm, const uct_iov_t *iov,
808                                             size_t iovcnt, unsigned flags,
809                                             uct_completion_t *comp)
810 {
811     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
812     uint32_t app_ctx, ib_imm;
813     int opcode;
814 
815     UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_TM_EAGER_ZCOPY_MAX_IOV(0),
816                        "uct_rc_mlx5_ep_tag_eager_zcopy");
817     UCT_RC_CHECK_ZCOPY_DATA(sizeof(struct ibv_tmh),
818                             uct_iov_total_length(iov, iovcnt),
819                             iface->tm.max_zcopy);
820 
821     UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND,
822                              _IMM);
823 
824     UCT_TL_EP_STAT_OP(&ep->super.super, TAG, ZCOPY,
825                       uct_iov_total_length(iov, iovcnt));
826 
827     return uct_rc_mlx5_ep_zcopy_post(ep, opcode|UCT_RC_MLX5_OPCODE_FLAG_TM,
828                                      iov, iovcnt, 0ul, 0, "", 0, 0, 0,
829                                      tag, app_ctx, ib_imm,
830                                      MLX5_WQE_CTRL_SOLICITED,
831                                      uct_rc_ep_send_op_completion_handler,
832                                      comp);
833 }
834 
uct_rc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep,uct_tag_t tag,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)835 ucs_status_ptr_t uct_rc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
836                                                const void *header,
837                                                unsigned header_length,
838                                                const uct_iov_t *iov,
839                                                size_t iovcnt, unsigned flags,
840                                                uct_completion_t *comp)
841 {
842     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
843     unsigned tm_hdr_len   = sizeof(struct ibv_tmh) +
844                             sizeof(struct ibv_rvh);
845     uint32_t op_index;
846 
847     UCT_RC_MLX5_CHECK_RNDV_PARAMS(iovcnt, header_length, tm_hdr_len,
848                                    UCT_IB_MLX5_AM_MAX_SHORT(0),
849                                    iface->tm.max_rndv_data +
850                                    UCT_RC_MLX5_TMH_PRIV_LEN);
851     UCT_RC_MLX5_CHECK_RES_PTR(iface, ep);
852 
853     op_index = uct_rc_mlx5_tag_get_op_id(iface, comp);
854 
855     uct_rc_mlx5_txqp_tag_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
856                                      &ep->tx.wq, MLX5_OPCODE_SEND, header,
857                                      header_length, iov, tag, op_index,
858                                      IBV_TMH_RNDV, 0, NULL, NULL, 0,
859                                      NULL, 0, MLX5_WQE_CTRL_SOLICITED);
860 
861     return (ucs_status_ptr_t)((uint64_t)op_index);
862 }
863 
uct_rc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep,uct_tag_t tag,const void * header,unsigned header_length,unsigned flags)864 ucs_status_t uct_rc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep, uct_tag_t tag,
865                                              const void* header,
866                                              unsigned header_length,
867                                              unsigned flags)
868 {
869     UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
870     UCT_CHECK_LENGTH(header_length + sizeof(struct ibv_tmh), 0,
871                      UCT_IB_MLX5_AM_MAX_SHORT(0), "tag_rndv_request");
872     UCT_RC_CHECK_RES(&iface->super, &ep->super);
873 
874     uct_rc_mlx5_txqp_tag_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
875                                      &ep->tx.wq, MLX5_OPCODE_SEND_IMM, header,
876                                      header_length, NULL, tag, 0,
877                                      IBV_TMH_EAGER, 0, NULL, NULL, 0,
878                                      NULL, 0, MLX5_WQE_CTRL_SOLICITED);
879     return UCS_OK;
880 }
881 #endif /* IBV_HW_TM */
882 
UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t,const uct_ep_params_t * params)883 UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, const uct_ep_params_t *params)
884 {
885     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(params->iface,
886                                                        uct_rc_mlx5_iface_common_t);
887     uct_ib_mlx5_md_t *md              = ucs_derived_of(iface->super.super.super.md,
888                                                        uct_ib_mlx5_md_t);
889     uct_ib_mlx5_qp_attr_t attr = {};
890     ucs_status_t status;
891 
892     /* Need to create QP before super constructor to get QP number */
893     uct_rc_mlx5_iface_fill_attr(iface, &attr, iface->super.config.tx_qp_len,
894                                 &iface->rx.srq);
895     uct_ib_exp_qp_fill_attr(&iface->super.super, &attr.super);
896     status = uct_rc_mlx5_iface_create_qp(iface, &self->tx.wq.super, &self->tx.wq, &attr);
897     if (status != UCS_OK) {
898         return status;
899     }
900 
901     UCS_CLASS_CALL_SUPER_INIT(uct_rc_ep_t, &iface->super,
902                               self->tx.wq.super.qp_num, params);
903 
904     if (self->tx.wq.super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS) {
905         status = uct_rc_iface_qp_init(&iface->super, self->tx.wq.super.verbs.qp);
906         if (status != UCS_OK) {
907             goto err;
908         }
909     }
910 
911     uct_rc_iface_add_qp(&iface->super, &self->super, self->tx.wq.super.qp_num);
912 
913     if (UCT_RC_MLX5_TM_ENABLED(iface)) {
914         /* Send queue of this QP will be used by FW for HW RNDV. Driver requires
915          * such a QP to be initialized with zero send queue length. */
916         memset(&attr, 0, sizeof(attr));
917         uct_rc_mlx5_iface_fill_attr(iface, &attr, 0, &iface->rx.srq);
918         uct_ib_exp_qp_fill_attr(&iface->super.super, &attr.super);
919         status = uct_rc_mlx5_iface_create_qp(iface, &self->tm_qp, NULL, &attr);
920         if (status != UCS_OK) {
921             goto err;
922         }
923 
924         uct_rc_iface_add_qp(&iface->super, &self->super, self->tm_qp.qp_num);
925     }
926 
927     self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max);
928     self->mp.free      = 1;
929     uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max);
930     return UCS_OK;
931 
932 err:
933     uct_ib_mlx5_destroy_qp(md, &self->tx.wq.super);
934     return status;
935 }
936 
uct_rc_mlx5_ep_clean_qp(uct_rc_mlx5_ep_t * ep,uct_ib_mlx5_qp_t * qp)937 static void uct_rc_mlx5_ep_clean_qp(uct_rc_mlx5_ep_t *ep, uct_ib_mlx5_qp_t *qp)
938 {
939     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(ep->super.super.super.iface,
940                                                        uct_rc_mlx5_iface_common_t);
941     uct_ib_mlx5_md_t *md              = ucs_derived_of(iface->super.super.super.md,
942                                                        uct_ib_mlx5_md_t);
943 
944     /* Make the HW generate CQEs for all in-progress SRQ receives from the QP,
945      * so we clean them all before ibv_modify_qp() can see them.
946      */
947 #if HAVE_DECL_IBV_CMD_MODIFY_QP && !HAVE_DEVX
948     struct ibv_qp_attr qp_attr;
949     struct ibv_modify_qp cmd;
950     int ret;
951 
952     /* Bypass mlx5 driver, and go directly to command interface, to avoid
953      * cleaning the CQ in mlx5 driver
954      */
955     memset(&qp_attr, 0, sizeof(qp_attr));
956     qp_attr.qp_state = IBV_QPS_RESET;
957     ret = ibv_cmd_modify_qp(qp->verbs.qp, &qp_attr, IBV_QP_STATE, &cmd, sizeof(cmd));
958     if (ret) {
959         ucs_warn("modify qp 0x%x to RESET failed: %m", qp->qp_num);
960     }
961 #else
962     (void)uct_ib_mlx5_modify_qp_state(md, qp, IBV_QPS_ERR);
963 #endif
964 
965     iface->super.rx.srq.available += uct_rc_mlx5_iface_commom_clean(
966             &iface->cq[UCT_IB_DIR_RX],
967             &iface->rx.srq, qp->qp_num);
968 
969     /* Synchronize CQ index with the driver, since it would remove pending
970      * completions for this QP (both send and receive) during ibv_destroy_qp().
971      */
972     uct_rc_mlx5_iface_common_update_cqs_ci(iface, &iface->super.super);
973     (void)uct_ib_mlx5_modify_qp_state(md, qp, IBV_QPS_RESET);
974     uct_rc_mlx5_iface_common_sync_cqs_ci(iface, &iface->super.super);
975 }
976 
UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t)977 static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t)
978 {
979     uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(self->super.super.super.iface,
980                                                        uct_rc_mlx5_iface_common_t);
981     uct_ib_mlx5_md_t *md              = ucs_derived_of(iface->super.super.super.md,
982                                                        uct_ib_mlx5_md_t);
983 
984     uct_ib_mlx5_txwq_cleanup(&self->tx.wq);
985     uct_rc_mlx5_ep_clean_qp(self, &self->tx.wq.super);
986 #if IBV_HW_TM
987     if (UCT_RC_MLX5_TM_ENABLED(iface)) {
988         uct_rc_mlx5_ep_clean_qp(self, &self->tm_qp);
989         uct_ib_mlx5_iface_put_res_domain(&self->tm_qp);
990         uct_rc_iface_remove_qp(&iface->super, self->tm_qp.qp_num);
991         uct_ib_mlx5_destroy_qp(md, &self->tm_qp);
992     }
993 #endif
994 
995     ucs_assert(self->mp.free == 1);
996 
997     /* Return all credits if user do flush(UCT_FLUSH_FLAG_CANCEL) before
998      * ep_destroy.
999      */
1000     uct_rc_txqp_available_add(&self->super.txqp,
1001                               self->tx.wq.bb_max -
1002                               uct_rc_txqp_available(&self->super.txqp));
1003 
1004     uct_ib_mlx5_verbs_srq_cleanup(&iface->rx.srq, iface->rx.srq.verbs.srq);
1005 
1006     uct_rc_iface_remove_qp(&iface->super, self->tx.wq.super.qp_num);
1007     uct_ib_mlx5_destroy_qp(md, &self->tx.wq.super);
1008 }
1009 
uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t * ep,ucs_status_t status)1010 ucs_status_t uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t *ep,
1011                                            ucs_status_t status)
1012 {
1013     uct_ib_iface_t *ib_iface = ucs_derived_of(ep->super.super.super.iface,
1014                                               uct_ib_iface_t);
1015     uct_rc_iface_t *rc_iface = ucs_derived_of(ib_iface, uct_rc_iface_t);
1016 
1017     uct_rc_txqp_purge_outstanding(&ep->super.txqp, status, 0);
1018     /* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble
1019        is not updated for the error cqe and all outstanding wqes*/
1020     rc_iface->tx.cq_available += ep->tx.wq.bb_max -
1021                                  uct_rc_txqp_available(&ep->super.txqp);
1022     return ib_iface->ops->set_ep_failed(ib_iface, &ep->super.super.super,
1023                                         status);
1024 }
1025 
uct_rc_mlx5_ep_set_failed(uct_ib_iface_t * iface,uct_ep_h ep,ucs_status_t status)1026 ucs_status_t uct_rc_mlx5_ep_set_failed(uct_ib_iface_t *iface, uct_ep_h ep,
1027                                        ucs_status_t status)
1028 {
1029     return uct_set_ep_failed(&UCS_CLASS_NAME(uct_rc_mlx5_ep_t), ep,
1030                              &iface->super.super, status);
1031 }
1032 
1033 UCS_CLASS_DEFINE(uct_rc_mlx5_ep_t, uct_rc_ep_t);
1034 UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_mlx5_ep_t, uct_ep_t, const uct_ep_params_t *);
1035 UCS_CLASS_DEFINE_DELETE_FUNC(uct_rc_mlx5_ep_t, uct_ep_t);
1036