1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "rc_iface.h"
12 #include "rc_ep.h"
13 
14 #include <ucs/arch/cpu.h>
15 #include <ucs/debug/memtrack.h>
16 #include <ucs/debug/log.h>
17 #include <ucs/type/class.h>
18 
19 
20 static const char *uct_rc_fence_mode_values[] = {
21     [UCT_RC_FENCE_MODE_NONE]   = "none",
22     [UCT_RC_FENCE_MODE_WEAK]   = "weak",
23     [UCT_RC_FENCE_MODE_AUTO]   = "auto",
24     [UCT_RC_FENCE_MODE_LAST]   = NULL
25 };
26 
27 ucs_config_field_t uct_rc_iface_common_config_table[] = {
28   {UCT_IB_CONFIG_PREFIX, "RX_INLINE=64;TX_INLINE_RESP=64;RX_QUEUE_LEN=4095;SEG_SIZE=8256", NULL,
29    ucs_offsetof(uct_rc_iface_common_config_t, super),
30    UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)},
31 
32   {"MAX_RD_ATOMIC", "4",
33    "Maximal number of outstanding read or atomic replies",
34    ucs_offsetof(uct_rc_iface_common_config_t, max_rd_atomic), UCS_CONFIG_TYPE_UINT},
35 
36   {"TIMEOUT", "1.0s",
37    "Transport timeout",
38    ucs_offsetof(uct_rc_iface_common_config_t, tx.timeout), UCS_CONFIG_TYPE_TIME},
39 
40   {"RETRY_COUNT", "7",
41    "Transport retries",
42    ucs_offsetof(uct_rc_iface_common_config_t, tx.retry_count), UCS_CONFIG_TYPE_UINT},
43 
44   {"RNR_TIMEOUT", "1ms",
45    "RNR timeout",
46    ucs_offsetof(uct_rc_iface_common_config_t, tx.rnr_timeout), UCS_CONFIG_TYPE_TIME},
47 
48   {"RNR_RETRY_COUNT", "7",
49    "RNR retries",
50    ucs_offsetof(uct_rc_iface_common_config_t, tx.rnr_retry_count), UCS_CONFIG_TYPE_UINT},
51 
52   {"FC_ENABLE", "y",
53    "Enable flow control protocol to prevent sender from overwhelming the receiver,\n"
54    "thus avoiding RC RnR backoff timer.",
55    ucs_offsetof(uct_rc_iface_common_config_t, fc.enable), UCS_CONFIG_TYPE_BOOL},
56 
57   {"FC_WND_SIZE", "512",
58    "The size of flow control window per endpoint. limits the number of AM\n"
59    "which can be sent w/o acknowledgment.",
60    ucs_offsetof(uct_rc_iface_common_config_t, fc.wnd_size), UCS_CONFIG_TYPE_UINT},
61 
62   {"FC_HARD_THRESH", "0.25",
63    "Threshold for sending hard request for FC credits to the peer. This value\n"
64    "refers to the percentage of the FC_WND_SIZE value. (must be > 0 and < 1)",
65    ucs_offsetof(uct_rc_iface_common_config_t, fc.hard_thresh), UCS_CONFIG_TYPE_DOUBLE},
66 
67 #if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
68   {"OOO_RW", "n",
69    "Enable out-of-order RDMA data placement",
70    ucs_offsetof(uct_rc_iface_common_config_t, ooo_rw), UCS_CONFIG_TYPE_BOOL},
71 #endif
72 
73   {"FENCE", "auto",
74    "IB fence type when API fence requested:\n"
75    "  none   - fence is a no-op\n"
76    "  weak   - fence makes sure remote reads are ordered with respect to remote writes\n"
77    "  auto   - select fence mode based on hardware capabilities",
78    ucs_offsetof(uct_rc_iface_common_config_t, fence_mode),
79                 UCS_CONFIG_TYPE_ENUM(uct_rc_fence_mode_values)},
80 
81   {"TX_NUM_GET_OPS", "",
82    "The configuration parameter replaced by UCX_RC_TX_NUM_GET_BYTES.",
83    UCS_CONFIG_DEPRECATED_FIELD_OFFSET, UCS_CONFIG_TYPE_DEPRECATED},
84 
85   {"MAX_GET_ZCOPY", "auto",
86    "Maximal size of get operation with zcopy protocol.",
87    ucs_offsetof(uct_rc_iface_common_config_t, tx.max_get_zcopy), UCS_CONFIG_TYPE_MEMUNITS},
88 
89   {"TX_NUM_GET_BYTES", "inf",
90    "Maximal number of bytes simultaneously transferred by get/RDMA_READ operations.",
91    ucs_offsetof(uct_rc_iface_common_config_t, tx.max_get_bytes), UCS_CONFIG_TYPE_MEMUNITS},
92 
93   {NULL}
94 };
95 
96 
97 /* Config relevant for rc_mlx5 and rc_verbs only (not for dc) */
98 ucs_config_field_t uct_rc_iface_config_table[] = {
99   {"RC_", "MAX_NUM_EPS=256", NULL,
100    ucs_offsetof(uct_rc_iface_config_t, super),
101    UCS_CONFIG_TYPE_TABLE(uct_rc_iface_common_config_table)},
102 
103   {"FC_SOFT_THRESH", "0.5",
104    "Threshold for sending soft request for FC credits to the peer. This value\n"
105    "refers to the percentage of the FC_WND_SIZE value. (must be > HARD_THRESH and < 1)",
106    ucs_offsetof(uct_rc_iface_config_t, soft_thresh), UCS_CONFIG_TYPE_DOUBLE},
107 
108   {"TX_CQ_MODERATION", "64",
109    "Maximum number of send WQEs which can be posted without requesting a completion.",
110    ucs_offsetof(uct_rc_iface_config_t, tx_cq_moderation), UCS_CONFIG_TYPE_UINT},
111 
112   {"TX_CQ_LEN", "4096",
113    "Length of send completion queue. This limits the total number of outstanding signaled sends.",
114    ucs_offsetof(uct_rc_iface_config_t, tx_cq_len), UCS_CONFIG_TYPE_UINT},
115 
116   {NULL}
117 };
118 
119 
120 #ifdef ENABLE_STATS
121 static ucs_stats_class_t uct_rc_iface_stats_class = {
122     .name = "rc_iface",
123     .num_counters = UCT_RC_IFACE_STAT_LAST,
124     .counter_names = {
125         [UCT_RC_IFACE_STAT_RX_COMPLETION] = "rx_completion",
126         [UCT_RC_IFACE_STAT_TX_COMPLETION] = "tx_completion",
127         [UCT_RC_IFACE_STAT_NO_CQE]        = "no_cqe",
128         [UCT_RC_IFACE_STAT_NO_READS]      = "no_reads"
129     }
130 };
131 
132 #endif /* ENABLE_STATS */
133 
134 
135 static ucs_mpool_ops_t uct_rc_fc_pending_mpool_ops = {
136     .chunk_alloc   = ucs_mpool_chunk_malloc,
137     .chunk_release = ucs_mpool_chunk_free,
138     .obj_init      = NULL,
139     .obj_cleanup   = NULL
140 };
141 
142 static void
uct_rc_iface_flush_comp_init(ucs_mpool_t * mp,void * obj,void * chunk)143 uct_rc_iface_flush_comp_init(ucs_mpool_t *mp, void *obj, void *chunk)
144 {
145     uct_rc_iface_t *iface      = ucs_container_of(mp, uct_rc_iface_t, tx.flush_mp);
146     uct_rc_iface_send_op_t *op = obj;
147 
148     op->handler = uct_rc_ep_flush_op_completion_handler;
149     op->flags   = 0;
150     op->iface   = iface;
151 }
152 
153 static ucs_mpool_ops_t uct_rc_flush_comp_mpool_ops = {
154     .chunk_alloc   = ucs_mpool_chunk_malloc,
155     .chunk_release = ucs_mpool_chunk_free,
156     .obj_init      = uct_rc_iface_flush_comp_init,
157     .obj_cleanup   = NULL
158 };
159 
uct_rc_iface_query(uct_rc_iface_t * iface,uct_iface_attr_t * iface_attr,size_t put_max_short,size_t max_inline,size_t am_max_hdr,size_t am_max_iov,size_t am_min_hdr,size_t rma_max_iov)160 ucs_status_t uct_rc_iface_query(uct_rc_iface_t *iface,
161                                 uct_iface_attr_t *iface_attr,
162                                 size_t put_max_short, size_t max_inline,
163                                 size_t am_max_hdr, size_t am_max_iov,
164                                 size_t am_min_hdr, size_t rma_max_iov)
165 {
166     uct_ib_device_t *dev = uct_ib_iface_device(&iface->super);
167     ucs_status_t status;
168 
169     status = uct_ib_iface_query(&iface->super,
170                                 ucs_max(sizeof(uct_rc_hdr_t), UCT_IB_RETH_LEN),
171                                 iface_attr);
172     if (status != UCS_OK) {
173         return status;
174     }
175 
176     iface_attr->iface_addr_len  = 0;
177     iface_attr->max_conn_priv   = 0;
178     iface_attr->cap.flags       = UCT_IFACE_FLAG_AM_BCOPY        |
179                                   UCT_IFACE_FLAG_AM_ZCOPY        |
180                                   UCT_IFACE_FLAG_PUT_BCOPY       |
181                                   UCT_IFACE_FLAG_PUT_ZCOPY       |
182                                   UCT_IFACE_FLAG_GET_BCOPY       |
183                                   UCT_IFACE_FLAG_GET_ZCOPY       |
184                                   UCT_IFACE_FLAG_PENDING         |
185                                   UCT_IFACE_FLAG_CONNECT_TO_EP   |
186                                   UCT_IFACE_FLAG_CB_SYNC;
187     iface_attr->cap.event_flags = UCT_IFACE_FLAG_EVENT_SEND_COMP |
188                                   UCT_IFACE_FLAG_EVENT_RECV      |
189                                   UCT_IFACE_FLAG_EVENT_FD;
190 
191     if (uct_ib_device_has_pci_atomics(dev)) {
192         if (dev->pci_fadd_arg_sizes & sizeof(uint64_t)) {
193             iface_attr->cap.atomic64.op_flags  |= UCS_BIT(UCT_ATOMIC_OP_ADD);
194             iface_attr->cap.atomic64.fop_flags |= UCS_BIT(UCT_ATOMIC_OP_ADD);
195         }
196         if (dev->pci_cswap_arg_sizes & sizeof(uint64_t)) {
197             iface_attr->cap.atomic64.fop_flags |= UCS_BIT(UCT_ATOMIC_OP_CSWAP);
198         }
199         iface_attr->cap.flags                  |= UCT_IFACE_FLAG_ATOMIC_CPU;
200     } else {
201         if (dev->atomic_arg_sizes & sizeof(uint64_t)) {
202             /* TODO: remove deprecated flags */
203             iface_attr->cap.flags              |= UCT_IFACE_FLAG_ATOMIC_DEVICE;
204 
205             iface_attr->cap.atomic64.op_flags  |= UCS_BIT(UCT_ATOMIC_OP_ADD);
206             iface_attr->cap.atomic64.fop_flags |= UCS_BIT(UCT_ATOMIC_OP_ADD)  |
207                                                   UCS_BIT(UCT_ATOMIC_OP_CSWAP);
208         }
209     }
210 
211     iface_attr->cap.put.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
212     iface_attr->cap.get.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
213     iface_attr->cap.am.opt_zcopy_align  = UCS_SYS_PCI_MAX_PAYLOAD;
214     iface_attr->cap.put.align_mtu = uct_ib_mtu_value(iface->super.config.path_mtu);
215     iface_attr->cap.get.align_mtu = uct_ib_mtu_value(iface->super.config.path_mtu);
216     iface_attr->cap.am.align_mtu  = uct_ib_mtu_value(iface->super.config.path_mtu);
217 
218 
219     /* PUT */
220     iface_attr->cap.put.max_short = put_max_short;
221     iface_attr->cap.put.max_bcopy = iface->super.config.seg_size;
222     iface_attr->cap.put.min_zcopy = 0;
223     iface_attr->cap.put.max_zcopy = uct_ib_iface_port_attr(&iface->super)->max_msg_sz;
224     iface_attr->cap.put.max_iov   = rma_max_iov;
225 
226     /* GET */
227     iface_attr->cap.get.max_bcopy = iface->super.config.seg_size;
228     iface_attr->cap.get.min_zcopy = iface->super.config.max_inl_cqe[UCT_IB_DIR_TX] + 1;
229     iface_attr->cap.get.max_zcopy = iface->config.max_get_zcopy;
230     iface_attr->cap.get.max_iov   = rma_max_iov;
231 
232     /* AM */
233     iface_attr->cap.am.max_short  = uct_ib_iface_hdr_size(max_inline, am_min_hdr);
234     iface_attr->cap.am.max_bcopy  = iface->super.config.seg_size - am_min_hdr;
235     iface_attr->cap.am.min_zcopy  = 0;
236     iface_attr->cap.am.max_zcopy  = iface->super.config.seg_size - am_min_hdr;
237     iface_attr->cap.am.max_hdr    = am_max_hdr - am_min_hdr;
238     iface_attr->cap.am.max_iov    = am_max_iov;
239 
240     /* Error Handling */
241     iface_attr->cap.flags        |= UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE;
242 
243     if (iface_attr->cap.am.max_short) {
244         iface_attr->cap.flags |= UCT_IFACE_FLAG_AM_SHORT;
245     }
246 
247     if (iface_attr->cap.put.max_short) {
248         iface_attr->cap.flags |= UCT_IFACE_FLAG_PUT_SHORT;
249     }
250 
251     return UCS_OK;
252 }
253 
uct_rc_iface_add_qp(uct_rc_iface_t * iface,uct_rc_ep_t * ep,unsigned qp_num)254 void uct_rc_iface_add_qp(uct_rc_iface_t *iface, uct_rc_ep_t *ep,
255                          unsigned qp_num)
256 {
257     uct_rc_ep_t ***ptr, **memb;
258 
259     ptr = &iface->eps[qp_num >> UCT_RC_QP_TABLE_ORDER];
260     if (*ptr == NULL) {
261         *ptr = ucs_calloc(UCS_BIT(UCT_RC_QP_TABLE_MEMB_ORDER), sizeof(**ptr),
262                            "rc qp table");
263     }
264 
265     memb = &(*ptr)[qp_num &  UCS_MASK(UCT_RC_QP_TABLE_MEMB_ORDER)];
266     ucs_assert(*memb == NULL);
267     *memb = ep;
268 }
269 
uct_rc_iface_remove_qp(uct_rc_iface_t * iface,unsigned qp_num)270 void uct_rc_iface_remove_qp(uct_rc_iface_t *iface, unsigned qp_num)
271 {
272     uct_rc_ep_t **memb;
273 
274     memb = &iface->eps[qp_num >> UCT_RC_QP_TABLE_ORDER]
275                       [qp_num &  UCS_MASK(UCT_RC_QP_TABLE_MEMB_ORDER)];
276     ucs_assert(*memb != NULL);
277     *memb = NULL;
278 }
279 
uct_rc_iface_flush(uct_iface_h tl_iface,unsigned flags,uct_completion_t * comp)280 ucs_status_t uct_rc_iface_flush(uct_iface_h tl_iface, unsigned flags,
281                                 uct_completion_t *comp)
282 {
283     uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
284     ucs_status_t status;
285     unsigned count;
286     uct_rc_ep_t *ep;
287 
288     if (comp != NULL) {
289         return UCS_ERR_UNSUPPORTED;
290     }
291 
292     status = uct_rc_iface_fence_relaxed_order(tl_iface);
293     if (status != UCS_OK) {
294         return status;
295     }
296 
297     count = 0;
298     ucs_list_for_each(ep, &iface->ep_list, list) {
299         status = uct_ep_flush(&ep->super.super, 0, NULL);
300         if ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS)) {
301             ++count;
302         } else if (status != UCS_OK) {
303             return status;
304         }
305     }
306 
307     if (count != 0) {
308         UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super);
309         return UCS_INPROGRESS;
310     }
311 
312     UCT_TL_IFACE_STAT_FLUSH(&iface->super.super);
313     return UCS_OK;
314 }
315 
uct_rc_iface_send_desc_init(uct_iface_h tl_iface,void * obj,uct_mem_h memh)316 void uct_rc_iface_send_desc_init(uct_iface_h tl_iface, void *obj, uct_mem_h memh)
317 {
318     uct_rc_iface_send_desc_t *desc = obj;
319 
320     desc->lkey        = uct_ib_memh_get_lkey(memh);
321     desc->super.flags = 0;
322 }
323 
uct_rc_init_fc_thresh(uct_rc_iface_config_t * config,uct_rc_iface_t * iface)324 ucs_status_t uct_rc_init_fc_thresh(uct_rc_iface_config_t *config,
325                                    uct_rc_iface_t *iface)
326 {
327     /* Check FC parameters correctness */
328     if ((config->soft_thresh <= config->super.fc.hard_thresh) ||
329         (config->soft_thresh >= 1)) {
330         ucs_error("The factor for soft FC threshold should be bigger"
331                   " than FC_HARD_THRESH value and less than 1 (s=%f, h=%f)",
332                   config->soft_thresh, config->super.fc.hard_thresh);
333         return UCS_ERR_INVALID_PARAM;
334     }
335 
336     if (config->super.fc.enable) {
337         iface->config.fc_soft_thresh = ucs_max((int)(iface->config.fc_wnd_size *
338                                                config->soft_thresh), 1);
339     } else {
340         iface->config.fc_soft_thresh  = 0;
341     }
342     return UCS_OK;
343 }
344 
uct_rc_iface_fc_handler(uct_rc_iface_t * iface,unsigned qp_num,uct_rc_hdr_t * hdr,unsigned length,uint32_t imm_data,uint16_t lid,unsigned flags)345 ucs_status_t uct_rc_iface_fc_handler(uct_rc_iface_t *iface, unsigned qp_num,
346                                      uct_rc_hdr_t *hdr, unsigned length,
347                                      uint32_t imm_data, uint16_t lid, unsigned flags)
348 {
349     ucs_status_t status;
350     int16_t      cur_wnd;
351     uct_rc_fc_request_t *fc_req;
352     uct_rc_ep_t  *ep  = uct_rc_iface_lookup_ep(iface, qp_num);
353     uint8_t fc_hdr    = uct_rc_fc_get_fc_hdr(hdr->am_id);
354 
355     ucs_assert(iface->config.fc_enabled);
356 
357     if (fc_hdr & UCT_RC_EP_FC_FLAG_GRANT) {
358         UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_GRANT, 1);
359 
360         /* Got either grant flag or special FC grant message */
361         cur_wnd = ep->fc.fc_wnd;
362 
363         /* Peer granted resources, so update wnd */
364         ep->fc.fc_wnd = iface->config.fc_wnd_size;
365         UCS_STATS_SET_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_FC_WND, ep->fc.fc_wnd);
366 
367         /* To preserve ordering we have to dispatch all pending
368          * operations if current fc_wnd is <= 0
369          * (otherwise it will be dispatched by tx progress) */
370         if (cur_wnd <= 0) {
371             ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
372             ucs_arbiter_dispatch(&iface->tx.arbiter, 1,
373                                  uct_rc_ep_process_pending, NULL);
374         }
375         if  (fc_hdr == UCT_RC_EP_FC_PURE_GRANT) {
376             /* Special FC grant message can't be bundled with any other FC
377              * request. Stop processing this AM and do not invoke AM handler */
378             UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_PURE_GRANT, 1);
379             return UCS_OK;
380         }
381     }
382 
383     if (fc_hdr & UCT_RC_EP_FC_FLAG_SOFT_REQ) {
384         UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_SOFT_REQ, 1);
385 
386         /* Got soft credit request. Mark ep that it needs to grant
387          * credits to the peer in outgoing AM (if any). */
388         ep->fc.flags |= UCT_RC_EP_FC_FLAG_GRANT;
389 
390     } else if (fc_hdr & UCT_RC_EP_FC_FLAG_HARD_REQ) {
391         UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_HARD_REQ, 1);
392         fc_req = ucs_mpool_get(&iface->tx.fc_mp);
393         if (ucs_unlikely(fc_req == NULL)) {
394             ucs_error("Failed to allocate FC request. "
395                       "Grant will not be sent on ep %p", ep);
396             return UCS_ERR_NO_MEMORY;
397         }
398         fc_req->ep         = &ep->super.super;
399         fc_req->super.func = uct_rc_ep_fc_grant;
400 
401         /* Got hard credit request. Send grant to the peer immediately */
402         status = uct_rc_ep_fc_grant(&fc_req->super);
403 
404         if (status == UCS_ERR_NO_RESOURCE){
405             /* force add request to group & schedule group to eliminate
406              * FC deadlock */
407             uct_pending_req_arb_group_push_head(&iface->tx.arbiter,
408                                                 &ep->arb_group, &fc_req->super);
409             ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
410         } else {
411             ucs_assertv_always(status == UCS_OK, "Failed to send FC grant msg: %s",
412                                ucs_status_string(status));
413         }
414     }
415 
416     return uct_iface_invoke_am(&iface->super.super,
417                                (hdr->am_id & ~UCT_RC_EP_FC_MASK),
418                                hdr + 1, length, flags);
419 }
420 
uct_rc_iface_tx_ops_init(uct_rc_iface_t * iface)421 static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface)
422 {
423     const unsigned count = iface->config.tx_ops_count;
424     uct_rc_iface_send_op_t *op;
425     ucs_status_t status;
426 
427     iface->tx.ops_buffer = ucs_calloc(count, sizeof(*iface->tx.ops_buffer),
428                                       "rc_tx_ops");
429     if (iface->tx.ops_buffer == NULL) {
430         return UCS_ERR_NO_MEMORY;
431     }
432 
433     iface->tx.free_ops = &iface->tx.ops_buffer[0];
434     for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count; ++op) {
435         op->handler = uct_rc_ep_send_op_completion_handler;
436         op->flags   = UCT_RC_IFACE_SEND_OP_FLAG_IFACE;
437         op->iface   = iface;
438         op->next    = (op == (iface->tx.ops_buffer + count - 1)) ? NULL : (op + 1);
439     }
440 
441     /* Create memory pool for flush completions. Can't just alloc a certain
442      * size buffer, because number of simultaneous flushes is not limited by
443      * CQ or QP resources. */
444     status = ucs_mpool_init(&iface->tx.flush_mp, 0, sizeof(*op), 0,
445                             UCS_SYS_CACHE_LINE_SIZE, 256,
446                             UINT_MAX, &uct_rc_flush_comp_mpool_ops,
447                             "flush-comps-only");
448 
449     return status;
450 }
451 
uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t * iface)452 static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface)
453 {
454     const unsigned total_count = iface->config.tx_ops_count;
455     uct_rc_iface_send_op_t *op;
456     unsigned free_count;
457 
458     free_count = 0;
459     for (op = iface->tx.free_ops; op != NULL; op = op->next) {
460         ++free_count;
461         ucs_assert(free_count <= total_count);
462     }
463     if (free_count != iface->config.tx_ops_count) {
464         ucs_warn("rc_iface %p: %u/%d send ops were not released", iface,
465                  total_count- free_count, total_count);
466     }
467     ucs_free(iface->tx.ops_buffer);
468 
469     ucs_mpool_cleanup(&iface->tx.flush_mp, 1);
470 }
471 
uct_rc_iface_do_progress(uct_iface_h tl_iface)472 unsigned uct_rc_iface_do_progress(uct_iface_h tl_iface)
473 {
474     uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
475     return iface->progress(iface);
476 }
477 
uct_rc_iface_init_rx(uct_rc_iface_t * iface,const uct_rc_iface_common_config_t * config,struct ibv_srq ** srq_p)478 ucs_status_t uct_rc_iface_init_rx(uct_rc_iface_t *iface,
479                                   const uct_rc_iface_common_config_t *config,
480                                   struct ibv_srq **srq_p)
481 {
482     struct ibv_srq_init_attr srq_init_attr;
483     struct ibv_pd *pd = uct_ib_iface_md(&iface->super)->pd;
484     struct ibv_srq *srq;
485 
486     srq_init_attr.attr.max_sge   = 1;
487     srq_init_attr.attr.max_wr    = config->super.rx.queue_len;
488     srq_init_attr.attr.srq_limit = 0;
489     srq_init_attr.srq_context    = iface;
490     srq                          = ibv_create_srq(pd, &srq_init_attr);
491     if (srq == NULL) {
492         ucs_error("ibv_create_srq() failed: %m");
493         return UCS_ERR_IO_ERROR;
494     }
495     iface->rx.srq.quota          = srq_init_attr.attr.max_wr;
496     *srq_p                       = srq;
497 
498     return UCS_OK;
499 }
500 
uct_rc_iface_config_limit_value(const char * name,int provided,int limit)501 static int uct_rc_iface_config_limit_value(const char *name,
502                                            int provided, int limit)
503 {
504     if (provided > limit) {
505          ucs_warn("using maximal value for %s (%d) instead of %d",
506                   name, limit, provided);
507          return limit;
508      } else {
509          return provided;
510      }
511 }
512 
UCS_CLASS_INIT_FUNC(uct_rc_iface_t,uct_rc_iface_ops_t * ops,uct_md_h md,uct_worker_h worker,const uct_iface_params_t * params,const uct_rc_iface_common_config_t * config,uct_ib_iface_init_attr_t * init_attr)513 UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md,
514                     uct_worker_h worker, const uct_iface_params_t *params,
515                     const uct_rc_iface_common_config_t *config,
516                     uct_ib_iface_init_attr_t *init_attr)
517 {
518     uct_ib_device_t *dev = &ucs_derived_of(md, uct_ib_md_t)->dev;
519     uint32_t max_ib_msg_size;
520     ucs_status_t status;
521 
522     UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker, params,
523                               &config->super, init_attr);
524 
525     self->tx.cq_available           = init_attr->cq_len[UCT_IB_DIR_TX] - 1;
526     self->rx.srq.available          = 0;
527     self->rx.srq.quota              = 0;
528     self->config.tx_qp_len          = config->super.tx.queue_len;
529     self->config.tx_min_sge         = config->super.tx.min_sge;
530     self->config.tx_min_inline      = config->super.tx.min_inline;
531     self->config.tx_ops_count       = init_attr->cq_len[UCT_IB_DIR_TX];
532     self->config.min_rnr_timer      = uct_ib_to_rnr_fabric_time(config->tx.rnr_timeout);
533     self->config.timeout            = uct_ib_to_qp_fabric_time(config->tx.timeout);
534     self->config.rnr_retry          = uct_rc_iface_config_limit_value(
535                                                   "RNR_RETRY_COUNT",
536                                                   config->tx.rnr_retry_count,
537                                                   UCT_RC_QP_MAX_RETRY_COUNT);
538     self->config.retry_cnt          = uct_rc_iface_config_limit_value(
539                                                   "RETRY_COUNT",
540                                                   config->tx.retry_count,
541                                                   UCT_RC_QP_MAX_RETRY_COUNT);
542     self->config.max_rd_atomic      = config->max_rd_atomic;
543     self->config.ooo_rw             = config->ooo_rw;
544 #if UCS_ENABLE_ASSERT
545     self->config.tx_cq_len          = init_attr->cq_len[UCT_IB_DIR_TX];
546 #endif
547     max_ib_msg_size                 = uct_ib_iface_port_attr(&self->super)->max_msg_sz;
548 
549     if (config->tx.max_get_zcopy == UCS_MEMUNITS_AUTO) {
550         self->config.max_get_zcopy = max_ib_msg_size;
551     } else if (config->tx.max_get_zcopy <= max_ib_msg_size) {
552         self->config.max_get_zcopy = config->tx.max_get_zcopy;
553     } else {
554         ucs_warn("rc_iface on %s:%d: reduced max_get_zcopy to %u",
555                  uct_ib_device_name(dev), self->super.config.port_num,
556                  max_ib_msg_size);
557         self->config.max_get_zcopy = max_ib_msg_size;
558     }
559 
560     if ((config->tx.max_get_bytes == UCS_MEMUNITS_INF) ||
561         (config->tx.max_get_bytes == UCS_MEMUNITS_AUTO)) {
562         self->tx.reads_available = SSIZE_MAX;
563     } else {
564         self->tx.reads_available = config->tx.max_get_bytes;
565     }
566 
567     uct_ib_fence_info_init(&self->tx.fi);
568     memset(self->eps, 0, sizeof(self->eps));
569     ucs_arbiter_init(&self->tx.arbiter);
570     ucs_list_head_init(&self->ep_list);
571 
572     /* Check FC parameters correctness */
573     if ((config->fc.hard_thresh <= 0) || (config->fc.hard_thresh >= 1)) {
574         ucs_error("The factor for hard FC threshold should be > 0 and < 1 (%f)",
575                   config->fc.hard_thresh);
576         status = UCS_ERR_INVALID_PARAM;
577         goto err;
578     }
579 
580     /* Create RX buffers mempool */
581     status = uct_ib_iface_recv_mpool_init(&self->super, &config->super,
582                                           "rc_recv_desc", &self->rx.mp);
583     if (status != UCS_OK) {
584         goto err;
585     }
586 
587     /* Create TX buffers mempool */
588     status = uct_iface_mpool_init(&self->super.super,
589                                   &self->tx.mp,
590                                   sizeof(uct_rc_iface_send_desc_t) + self->super.config.seg_size,
591                                   sizeof(uct_rc_iface_send_desc_t),
592                                   UCS_SYS_CACHE_LINE_SIZE,
593                                   &config->super.tx.mp,
594                                   self->config.tx_qp_len,
595                                   uct_rc_iface_send_desc_init,
596                                   "rc_send_desc");
597     if (status != UCS_OK) {
598         goto err_destroy_rx_mp;
599     }
600 
601     /* Allocate tx operations */
602     status = uct_rc_iface_tx_ops_init(self);
603     if (status != UCS_OK) {
604         goto err_destroy_tx_mp;
605     }
606 
607     /* Set atomic handlers according to atomic reply endianness */
608     self->config.atomic64_handler = dev->atomic_arg_sizes_be & sizeof(uint64_t) ?
609                                     uct_rc_ep_atomic_handler_64_be1 :
610                                     uct_rc_ep_atomic_handler_64_be0;
611     self->config.atomic32_ext_handler = dev->ext_atomic_arg_sizes_be & sizeof(uint32_t) ?
612                                         uct_rc_ep_atomic_handler_32_be1 :
613                                         uct_rc_ep_atomic_handler_32_be0;
614     self->config.atomic64_ext_handler = dev->ext_atomic_arg_sizes_be & sizeof(uint64_t) ?
615                                         uct_rc_ep_atomic_handler_64_be1 :
616                                         uct_rc_ep_atomic_handler_64_be0;
617 
618     status = UCS_STATS_NODE_ALLOC(&self->stats, &uct_rc_iface_stats_class,
619                                   self->super.super.stats);
620     if (status != UCS_OK) {
621         goto err_cleanup_tx_ops;
622     }
623 
624     /* Initialize RX resources (SRQ) */
625     status = ops->init_rx(self, config);
626     if (status != UCS_OK) {
627         goto err_destroy_stats;
628     }
629 
630     self->config.fc_enabled      = config->fc.enable;
631 
632     if (self->config.fc_enabled) {
633         /* Assume that number of recv buffers is the same on all peers.
634          * Then FC window size is the same for all endpoints as well.
635          * TODO: Make wnd size to be a property of the particular interface.
636          * We could distribute it via rc address then.*/
637         self->config.fc_wnd_size     = ucs_min(config->fc.wnd_size,
638                                                config->super.rx.queue_len);
639         self->config.fc_hard_thresh  = ucs_max((int)(self->config.fc_wnd_size *
640                                                config->fc.hard_thresh), 1);
641 
642         /* Create mempool for pending requests for FC grant */
643         status = ucs_mpool_init(&self->tx.fc_mp,
644                                 0,
645                                 init_attr->fc_req_size,
646                                 0,
647                                 1,
648                                 128,
649                                 UINT_MAX,
650                                 &uct_rc_fc_pending_mpool_ops,
651                                 "pending-fc-grants-only");
652         if (status != UCS_OK) {
653             goto err_cleanup_rx;
654         }
655     } else {
656         self->config.fc_wnd_size     = INT16_MAX;
657         self->config.fc_hard_thresh  = 0;
658     }
659 
660     return UCS_OK;
661 
662 err_cleanup_rx:
663     ops->cleanup_rx(self);
664 err_destroy_stats:
665     UCS_STATS_NODE_FREE(self->stats);
666 err_cleanup_tx_ops:
667     uct_rc_iface_tx_ops_cleanup(self);
668 err_destroy_tx_mp:
669     ucs_mpool_cleanup(&self->tx.mp, 1);
670 err_destroy_rx_mp:
671     ucs_mpool_cleanup(&self->rx.mp, 1);
672 err:
673     return status;
674 }
675 
UCS_CLASS_CLEANUP_FUNC(uct_rc_iface_t)676 static UCS_CLASS_CLEANUP_FUNC(uct_rc_iface_t)
677 {
678     uct_rc_iface_ops_t *ops = ucs_derived_of(self->super.ops, uct_rc_iface_ops_t);
679     unsigned i;
680 
681     /* Release table. TODO release on-demand when removing ep. */
682     for (i = 0; i < UCT_RC_QP_TABLE_SIZE; ++i) {
683         ucs_free(self->eps[i]);
684     }
685 
686     if (!ucs_list_is_empty(&self->ep_list)) {
687         ucs_warn("some eps were not destroyed");
688     }
689 
690     ucs_arbiter_cleanup(&self->tx.arbiter);
691 
692     UCS_STATS_NODE_FREE(self->stats);
693 
694     ops->cleanup_rx(self);
695     uct_rc_iface_tx_ops_cleanup(self);
696     ucs_mpool_cleanup(&self->tx.mp, 1);
697     ucs_mpool_cleanup(&self->rx.mp, 0); /* Cannot flush SRQ */
698     if (self->config.fc_enabled) {
699         ucs_mpool_cleanup(&self->tx.fc_mp, 1);
700     }
701 }
702 
703 UCS_CLASS_DEFINE(uct_rc_iface_t, uct_ib_iface_t);
704 
uct_rc_iface_fill_attr(uct_rc_iface_t * iface,uct_ib_qp_attr_t * attr,unsigned max_send_wr,struct ibv_srq * srq)705 void uct_rc_iface_fill_attr(uct_rc_iface_t *iface, uct_ib_qp_attr_t *attr,
706                             unsigned max_send_wr, struct ibv_srq *srq)
707 {
708     attr->srq                        = srq;
709     attr->cap.max_send_wr            = max_send_wr;
710     attr->cap.max_recv_wr            = 0;
711     attr->cap.max_send_sge           = iface->config.tx_min_sge;
712     attr->cap.max_recv_sge           = 1;
713     attr->cap.max_inline_data        = iface->config.tx_min_inline;
714     attr->qp_type                    = iface->super.config.qp_type;
715     attr->sq_sig_all                 = !iface->config.tx_moderation;
716     attr->max_inl_cqe[UCT_IB_DIR_RX] = iface->super.config.max_inl_cqe[UCT_IB_DIR_RX];
717     attr->max_inl_cqe[UCT_IB_DIR_TX] = iface->super.config.max_inl_cqe[UCT_IB_DIR_TX];
718 }
719 
uct_rc_iface_qp_create(uct_rc_iface_t * iface,struct ibv_qp ** qp_p,uct_ib_qp_attr_t * attr,unsigned max_send_wr,struct ibv_srq * srq)720 ucs_status_t uct_rc_iface_qp_create(uct_rc_iface_t *iface, struct ibv_qp **qp_p,
721                                     uct_ib_qp_attr_t *attr, unsigned max_send_wr,
722                                     struct ibv_srq *srq)
723 {
724     uct_rc_iface_fill_attr(iface, attr, max_send_wr, srq);
725     uct_ib_iface_fill_attr(&iface->super, attr);
726 
727     return uct_ib_iface_create_qp(&iface->super, attr, qp_p);
728 }
729 
uct_rc_iface_qp_init(uct_rc_iface_t * iface,struct ibv_qp * qp)730 ucs_status_t uct_rc_iface_qp_init(uct_rc_iface_t *iface, struct ibv_qp *qp)
731 {
732     struct ibv_qp_attr qp_attr;
733     int ret;
734 
735     memset(&qp_attr, 0, sizeof(qp_attr));
736 
737     qp_attr.qp_state              = IBV_QPS_INIT;
738     qp_attr.pkey_index            = iface->super.pkey_index;
739     qp_attr.port_num              = iface->super.config.port_num;
740     qp_attr.qp_access_flags       = IBV_ACCESS_LOCAL_WRITE  |
741                                     IBV_ACCESS_REMOTE_WRITE |
742                                     IBV_ACCESS_REMOTE_READ  |
743                                     IBV_ACCESS_REMOTE_ATOMIC;
744     ret = ibv_modify_qp(qp, &qp_attr,
745                         IBV_QP_STATE      |
746                         IBV_QP_PKEY_INDEX |
747                         IBV_QP_PORT       |
748                         IBV_QP_ACCESS_FLAGS);
749     if (ret) {
750          ucs_error("error modifying QP to INIT: %m");
751          return UCS_ERR_IO_ERROR;
752     }
753 
754     return UCS_OK;
755 }
756 
uct_rc_iface_qp_connect(uct_rc_iface_t * iface,struct ibv_qp * qp,const uint32_t dest_qp_num,struct ibv_ah_attr * ah_attr,enum ibv_mtu path_mtu)757 ucs_status_t uct_rc_iface_qp_connect(uct_rc_iface_t *iface, struct ibv_qp *qp,
758                                      const uint32_t dest_qp_num,
759                                      struct ibv_ah_attr *ah_attr,
760                                      enum ibv_mtu path_mtu)
761 {
762 #if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
763     struct ibv_exp_qp_attr qp_attr;
764     uct_ib_device_t *dev;
765 #else
766     struct ibv_qp_attr qp_attr;
767 #endif
768     long qp_attr_mask;
769     int ret;
770 
771     ucs_assert(path_mtu != 0);
772 
773     memset(&qp_attr, 0, sizeof(qp_attr));
774 
775     qp_attr.qp_state              = IBV_QPS_RTR;
776     qp_attr.dest_qp_num           = dest_qp_num;
777     qp_attr.rq_psn                = 0;
778     qp_attr.path_mtu              = path_mtu;
779     qp_attr.max_dest_rd_atomic    = iface->config.max_rd_atomic;
780     qp_attr.min_rnr_timer         = iface->config.min_rnr_timer;
781     qp_attr.ah_attr               = *ah_attr;
782     qp_attr_mask                  = IBV_QP_STATE              |
783                                     IBV_QP_AV                 |
784                                     IBV_QP_PATH_MTU           |
785                                     IBV_QP_DEST_QPN           |
786                                     IBV_QP_RQ_PSN             |
787                                     IBV_QP_MAX_DEST_RD_ATOMIC |
788                                     IBV_QP_MIN_RNR_TIMER;
789 
790 #if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
791     dev = uct_ib_iface_device(&iface->super);
792     if (iface->config.ooo_rw && UCX_IB_DEV_IS_OOO_SUPPORTED(dev, rc)) {
793         ucs_debug("enabling out-of-order on RC QP %x dev %s",
794                   qp->qp_num, uct_ib_device_name(dev));
795         qp_attr_mask |= IBV_EXP_QP_OOO_RW_DATA_PLACEMENT;
796     }
797     ret = ibv_exp_modify_qp(qp, &qp_attr, qp_attr_mask);
798 #else
799     ret = ibv_modify_qp(qp, &qp_attr, qp_attr_mask);
800 #endif
801     if (ret) {
802         ucs_error("error modifying QP to RTR: %m");
803         return UCS_ERR_IO_ERROR;
804     }
805 
806     qp_attr.qp_state              = IBV_QPS_RTS;
807     qp_attr.sq_psn                = 0;
808     qp_attr.timeout               = iface->config.timeout;
809     qp_attr.rnr_retry             = iface->config.rnr_retry;
810     qp_attr.retry_cnt             = iface->config.retry_cnt;
811     qp_attr.max_rd_atomic         = iface->config.max_rd_atomic;
812     qp_attr_mask                  = IBV_QP_STATE              |
813                                     IBV_QP_TIMEOUT            |
814                                     IBV_QP_RETRY_CNT          |
815                                     IBV_QP_RNR_RETRY          |
816                                     IBV_QP_SQ_PSN             |
817                                     IBV_QP_MAX_QP_RD_ATOMIC;
818 
819 #if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
820     ret = ibv_exp_modify_qp(qp, &qp_attr, qp_attr_mask);
821 #else
822     ret = ibv_modify_qp(qp, &qp_attr, qp_attr_mask);
823 #endif
824     if (ret) {
825         ucs_error("error modifying QP to RTS: %m");
826         return UCS_ERR_IO_ERROR;
827     }
828 
829     ucs_debug("connected rc qp 0x%x on "UCT_IB_IFACE_FMT" to lid %d(+%d) sl %d "
830               "remote_qp 0x%x mtu %zu timer %dx%d rnr %dx%d rd_atom %d",
831               qp->qp_num, UCT_IB_IFACE_ARG(&iface->super), ah_attr->dlid,
832               ah_attr->src_path_bits, ah_attr->sl, qp_attr.dest_qp_num,
833               uct_ib_mtu_value(qp_attr.path_mtu), qp_attr.timeout,
834               qp_attr.retry_cnt, qp_attr.min_rnr_timer, qp_attr.rnr_retry,
835               qp_attr.max_rd_atomic);
836 
837     return UCS_OK;
838 }
839 
uct_rc_iface_common_event_arm(uct_iface_h tl_iface,unsigned events,int force_rx_all)840 ucs_status_t uct_rc_iface_common_event_arm(uct_iface_h tl_iface,
841                                            unsigned events, int force_rx_all)
842 {
843     uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
844     int arm_rx_solicited, arm_rx_all;
845     ucs_status_t status;
846 
847     status = uct_ib_iface_pre_arm(&iface->super);
848     if (status != UCS_OK) {
849         return status;
850     }
851 
852     if (events & UCT_EVENT_SEND_COMP) {
853         status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_TX, 0);
854         if (status != UCS_OK) {
855             return status;
856         }
857     }
858 
859     arm_rx_solicited = 0;
860     arm_rx_all       = 0;
861     if (events & UCT_EVENT_RECV) {
862         arm_rx_solicited = 1; /* to wake up on active messages */
863     }
864     if (((events & UCT_EVENT_SEND_COMP) && iface->config.fc_enabled) ||
865         force_rx_all) {
866         arm_rx_all       = 1; /* to wake up on FC grants (or if forced) */
867     }
868 
869     if (arm_rx_solicited || arm_rx_all) {
870         status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX,
871                                           arm_rx_solicited && !arm_rx_all);
872         if (status != UCS_OK) {
873             return status;
874         }
875     }
876 
877     return UCS_OK;
878 
879 }
880 
uct_rc_iface_event_arm(uct_iface_h tl_iface,unsigned events)881 ucs_status_t uct_rc_iface_event_arm(uct_iface_h tl_iface, unsigned events)
882 {
883     return uct_rc_iface_common_event_arm(tl_iface, events, 0);
884 }
885 
uct_rc_iface_fence(uct_iface_h tl_iface,unsigned flags)886 ucs_status_t uct_rc_iface_fence(uct_iface_h tl_iface, unsigned flags)
887 {
888     uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
889 
890     if (iface->config.fence_mode != UCT_RC_FENCE_MODE_NONE) {
891         iface->tx.fi.fence_beat++;
892     }
893 
894     UCT_TL_IFACE_STAT_FENCE(&iface->super.super);
895     return UCS_OK;
896 }
897 
898