1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "ud_iface.h"
12 #include "ud_ep.h"
13 #include "ud_inl.h"
14 
15 #include <ucs/arch/cpu.h>
16 #include <ucs/debug/memtrack.h>
17 #include <ucs/debug/log.h>
18 #include <ucs/type/class.h>
19 #include <ucs/datastruct/queue.h>
20 #include <sys/poll.h>
21 
22 
23 #ifdef ENABLE_STATS
24 static ucs_stats_class_t uct_ud_iface_stats_class = {
25     .name = "ud_iface",
26     .num_counters = UCT_UD_IFACE_STAT_LAST,
27     .counter_names = {
28         [UCT_UD_IFACE_STAT_RX_DROP] = "rx_drop"
29     }
30 };
31 #endif
32 
33 /* cppcheck-suppress ctunullpointer */
34 SGLIB_DEFINE_LIST_FUNCTIONS(uct_ud_iface_peer_t, uct_ud_iface_peer_cmp, next)
35 SGLIB_DEFINE_HASHED_CONTAINER_FUNCTIONS(uct_ud_iface_peer_t,
36                                         UCT_UD_HASH_SIZE,
37                                         uct_ud_iface_peer_hash)
38 
39 static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface);
40 static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface);
41 
42 
uct_ud_iface_cep_init(uct_ud_iface_t * iface)43 void uct_ud_iface_cep_init(uct_ud_iface_t *iface)
44 {
45     sglib_hashed_uct_ud_iface_peer_t_init(iface->peers);
46 }
47 
48 static void
uct_ud_iface_cep_cleanup_eps(uct_ud_iface_t * iface,uct_ud_iface_peer_t * peer)49 uct_ud_iface_cep_cleanup_eps(uct_ud_iface_t *iface, uct_ud_iface_peer_t *peer)
50 {
51     uct_ud_ep_t *ep, *tmp;
52 
53     ucs_list_for_each_safe(ep, tmp, &peer->ep_list, cep_list) {
54         if (ep->conn_id < peer->conn_id_last) {
55             /* active connection should already be cleaned by owner */
56             ucs_warn("iface (%p) peer (qpn=%d lid=%d) cleanup with %d endpoints still active",
57                      iface, peer->dst_qpn, peer->dlid,
58                      (int)ucs_list_length(&peer->ep_list));
59             continue;
60         }
61         ucs_list_del(&ep->cep_list);
62         ucs_trace("cep:ep_destroy(%p) conn_id %d", ep, ep->conn_id);
63         uct_ep_destroy(&ep->super.super);
64     }
65 }
66 
uct_ud_iface_cep_cleanup(uct_ud_iface_t * iface)67 void uct_ud_iface_cep_cleanup(uct_ud_iface_t *iface)
68 {
69     uct_ud_iface_peer_t *peer;
70     struct sglib_hashed_uct_ud_iface_peer_t_iterator it_peer;
71 
72     for (peer = sglib_hashed_uct_ud_iface_peer_t_it_init(&it_peer,
73                                                          iface->peers);
74          peer != NULL;
75          peer = sglib_hashed_uct_ud_iface_peer_t_it_next(&it_peer)) {
76 
77         uct_ud_iface_cep_cleanup_eps(iface, peer);
78         free(peer);
79     }
80 }
81 
82 static uct_ud_iface_peer_t *
uct_ud_iface_cep_lookup_addr(uct_ud_iface_t * iface,uint16_t dlid,const union ibv_gid * dgid,uint32_t dest_qpn,int path_index)83 uct_ud_iface_cep_lookup_addr(uct_ud_iface_t *iface, uint16_t dlid,
84                              const union ibv_gid *dgid, uint32_t dest_qpn,
85                              int path_index)
86 {
87     uct_ud_iface_peer_t key;
88     key.dlid       = dlid;
89     key.dgid       = *dgid;
90     key.dst_qpn    = dest_qpn;
91     key.path_index = path_index;
92     return sglib_hashed_uct_ud_iface_peer_t_find_member(iface->peers, &key);
93 }
94 
95 static uct_ud_iface_peer_t *
uct_ud_iface_cep_lookup_peer(uct_ud_iface_t * iface,const uct_ib_address_t * src_ib_addr,const uct_ud_iface_addr_t * src_if_addr,int path_index)96 uct_ud_iface_cep_lookup_peer(uct_ud_iface_t *iface,
97                              const uct_ib_address_t *src_ib_addr,
98                              const uct_ud_iface_addr_t *src_if_addr,
99                              int path_index)
100 {
101     uint32_t dest_qpn = uct_ib_unpack_uint24(src_if_addr->qp_num);
102     uct_ib_address_pack_params_t params;
103 
104     uct_ib_address_unpack(src_ib_addr, &params);
105     return uct_ud_iface_cep_lookup_addr(iface, params.lid, &params.gid,
106                                         dest_qpn, path_index);
107 }
108 
109 static uct_ud_ep_t *
uct_ud_iface_cep_lookup_ep(uct_ud_iface_peer_t * peer,uint32_t conn_id)110 uct_ud_iface_cep_lookup_ep(uct_ud_iface_peer_t *peer, uint32_t conn_id)
111 {
112     uint32_t id;
113     uct_ud_ep_t *ep;
114 
115     if (conn_id != UCT_UD_EP_CONN_ID_MAX) {
116         id = conn_id;
117     } else {
118         id = peer->conn_id_last;
119         /* TODO: O(1) lookup in this case (new connection) */
120     }
121     ucs_list_for_each(ep, &peer->ep_list, cep_list) {
122         if (ep->conn_id == id) {
123             return ep;
124         }
125         if (ep->conn_id < id) {
126             break;
127         }
128     }
129     return NULL;
130 }
131 
132 static uint32_t
uct_ud_iface_cep_getid(uct_ud_iface_peer_t * peer,uint32_t conn_id)133 uct_ud_iface_cep_getid(uct_ud_iface_peer_t *peer, uint32_t conn_id)
134 {
135     uint32_t new_id;
136 
137     if (conn_id != UCT_UD_EP_CONN_ID_MAX) {
138         return conn_id;
139     }
140     new_id = peer->conn_id_last++;
141     return new_id;
142 }
143 
144 /* insert new ep that is connected to src_if_addr */
uct_ud_iface_cep_insert(uct_ud_iface_t * iface,const uct_ib_address_t * src_ib_addr,const uct_ud_iface_addr_t * src_if_addr,uct_ud_ep_t * ep,uint32_t conn_id,int path_index)145 ucs_status_t uct_ud_iface_cep_insert(uct_ud_iface_t *iface,
146                                      const uct_ib_address_t *src_ib_addr,
147                                      const uct_ud_iface_addr_t *src_if_addr,
148                                      uct_ud_ep_t *ep, uint32_t conn_id,
149                                      int path_index)
150 {
151     uint32_t dest_qpn = uct_ib_unpack_uint24(src_if_addr->qp_num);
152     uct_ib_address_pack_params_t params;
153     uct_ud_iface_peer_t *peer;
154     uct_ud_ep_t *cep;
155 
156     uct_ib_address_unpack(src_ib_addr, &params);
157     peer = uct_ud_iface_cep_lookup_addr(iface, params.lid, &params.gid,
158                                         dest_qpn, path_index);
159     if (peer == NULL) {
160         peer = malloc(sizeof *peer);
161         if (peer == NULL) {
162             return UCS_ERR_NO_MEMORY;
163         }
164 
165         peer->dlid       = params.lid;
166         peer->dgid       = params.gid;
167         peer->dst_qpn    = dest_qpn;
168         peer->path_index = path_index;
169         sglib_hashed_uct_ud_iface_peer_t_add(iface->peers, peer);
170         ucs_list_head_init(&peer->ep_list);
171         peer->conn_id_last = 0;
172     }
173 
174     ep->conn_id = uct_ud_iface_cep_getid(peer, conn_id);
175     if (ep->conn_id == UCT_UD_EP_CONN_ID_MAX) {
176         return UCS_ERR_NO_RESOURCE;
177     }
178 
179     if (ucs_list_is_empty(&peer->ep_list)) {
180         ucs_list_add_head(&peer->ep_list, &ep->cep_list);
181         return UCS_OK;
182     }
183     ucs_list_for_each(cep, &peer->ep_list, cep_list) {
184         ucs_assert_always(cep->conn_id != ep->conn_id);
185         if (cep->conn_id < ep->conn_id) {
186             ucs_list_insert_before(&cep->cep_list, &ep->cep_list);
187             return UCS_OK;
188         }
189     }
190     return UCS_OK;
191 }
192 
uct_ud_iface_cep_remove(uct_ud_ep_t * ep)193 void uct_ud_iface_cep_remove(uct_ud_ep_t *ep)
194 {
195   if (ucs_list_is_empty(&ep->cep_list)) {
196       return;
197   }
198   ucs_trace("iface(%p) cep_remove:ep(%p)", ep->super.super.iface, ep);
199   ucs_list_del(&ep->cep_list);
200   ucs_list_head_init(&ep->cep_list);
201 }
202 
uct_ud_iface_cep_lookup(uct_ud_iface_t * iface,const uct_ib_address_t * src_ib_addr,const uct_ud_iface_addr_t * src_if_addr,uint32_t conn_id,int path_index)203 uct_ud_ep_t *uct_ud_iface_cep_lookup(uct_ud_iface_t *iface,
204                                      const uct_ib_address_t *src_ib_addr,
205                                      const uct_ud_iface_addr_t *src_if_addr,
206                                      uint32_t conn_id, int path_index)
207 {
208     uct_ud_iface_peer_t *peer;
209     uct_ud_ep_t *ep;
210 
211     peer = uct_ud_iface_cep_lookup_peer(iface, src_ib_addr, src_if_addr,
212                                         path_index);
213     if (peer == NULL) {
214         return NULL;
215     }
216 
217     ep = uct_ud_iface_cep_lookup_ep(peer, conn_id);
218     if (ep && conn_id == UCT_UD_EP_CONN_ID_MAX) {
219         peer->conn_id_last++;
220     }
221     return ep;
222 }
223 
uct_ud_iface_cep_rollback(uct_ud_iface_t * iface,const uct_ib_address_t * src_ib_addr,const uct_ud_iface_addr_t * src_if_addr,uct_ud_ep_t * ep)224 void uct_ud_iface_cep_rollback(uct_ud_iface_t *iface,
225                                const uct_ib_address_t *src_ib_addr,
226                                const uct_ud_iface_addr_t *src_if_addr,
227                                uct_ud_ep_t *ep)
228 {
229     uct_ud_iface_peer_t *peer;
230 
231     peer = uct_ud_iface_cep_lookup_peer(iface, src_ib_addr, src_if_addr,
232                                         ep->path_index);
233     ucs_assert_always(peer != NULL);
234     ucs_assert_always(peer->conn_id_last > 0);
235     ucs_assert_always(ep->conn_id + 1 == peer->conn_id_last);
236     ucs_assert_always(!ucs_list_is_empty(&peer->ep_list));
237     ucs_assert_always(!ucs_list_is_empty(&ep->cep_list));
238 
239     peer->conn_id_last--;
240     uct_ud_iface_cep_remove(ep);
241 }
242 
uct_ud_iface_send_skb_init(uct_iface_h tl_iface,void * obj,uct_mem_h memh)243 static void uct_ud_iface_send_skb_init(uct_iface_h tl_iface, void *obj,
244                                        uct_mem_h memh)
245 {
246     uct_ud_send_skb_t *skb = obj;
247 
248     skb->lkey  = uct_ib_memh_get_lkey(memh);
249     skb->flags = UCT_UD_SEND_SKB_FLAG_INVALID;
250 }
251 
252 static ucs_status_t
uct_ud_iface_create_qp(uct_ud_iface_t * self,const uct_ud_iface_config_t * config)253 uct_ud_iface_create_qp(uct_ud_iface_t *self, const uct_ud_iface_config_t *config)
254 {
255     uct_ud_iface_ops_t *ops = ucs_derived_of(self->super.ops, uct_ud_iface_ops_t);
256     uct_ib_qp_attr_t qp_init_attr = {};
257     struct ibv_qp_attr qp_attr;
258     static ucs_status_t status;
259     int ret;
260 
261     qp_init_attr.qp_type             = IBV_QPT_UD;
262     qp_init_attr.sq_sig_all          = 0;
263     qp_init_attr.cap.max_send_wr     = config->super.tx.queue_len;
264     qp_init_attr.cap.max_recv_wr     = config->super.rx.queue_len;
265     qp_init_attr.cap.max_send_sge    = 2;
266     qp_init_attr.cap.max_recv_sge    = 1;
267     qp_init_attr.cap.max_inline_data = config->super.tx.min_inline;
268 
269     status = ops->create_qp(&self->super, &qp_init_attr, &self->qp);
270     if (status != UCS_OK) {
271         return status;
272     }
273 
274     self->config.max_inline = qp_init_attr.cap.max_inline_data;
275 
276     memset(&qp_attr, 0, sizeof(qp_attr));
277     /* Modify QP to INIT state */
278     qp_attr.qp_state   = IBV_QPS_INIT;
279     qp_attr.pkey_index = self->super.pkey_index;
280     qp_attr.port_num   = self->super.config.port_num;
281     qp_attr.qkey       = UCT_IB_KEY;
282     ret = ibv_modify_qp(self->qp, &qp_attr,
283                         IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY);
284     if (ret) {
285         ucs_error("Failed to modify UD QP to INIT: %m");
286         goto err_destroy_qp;
287     }
288 
289     /* Modify to RTR */
290     qp_attr.qp_state = IBV_QPS_RTR;
291     ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE);
292     if (ret) {
293         ucs_error("Failed to modify UD QP to RTR: %m");
294         goto err_destroy_qp;
295     }
296 
297     /* Modify to RTS */
298     qp_attr.qp_state = IBV_QPS_RTS;
299     qp_attr.sq_psn = 0;
300     ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE | IBV_QP_SQ_PSN);
301     if (ret) {
302         ucs_error("Failed to modify UD QP to RTS: %m");
303         goto err_destroy_qp;
304     }
305 
306     return UCS_OK;
307 err_destroy_qp:
308     uct_ib_destroy_qp(self->qp);
309     return UCS_ERR_INVALID_PARAM;
310 }
311 
uct_ud_iface_async_progress(uct_ud_iface_t * iface)312 static inline void uct_ud_iface_async_progress(uct_ud_iface_t *iface)
313 {
314     uct_ud_iface_ops_t *ops =
315         ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t);
316     unsigned ev_count;
317 
318     if (ucs_unlikely(iface->async.disable)) {
319         return;
320     }
321 
322     ev_count = ops->async_progress(iface);
323     if (ev_count > 0) {
324         uct_ud_iface_raise_pending_async_ev(iface);
325     }
326 }
327 
uct_ud_iface_async_handler(int fd,int events,void * arg)328 static void uct_ud_iface_async_handler(int fd, int events, void *arg)
329 {
330     uct_ud_iface_t *iface = arg;
331 
332     uct_ud_iface_async_progress(iface);
333 
334     /* arm for new solicited events
335      * if user asks to provide notifications for all completion
336      * events by calling uct_iface_event_arm(), RX CQ will be
337      * armed again with solicited flag = 0 */
338     uct_ib_iface_pre_arm(&iface->super);
339     iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX, 1);
340 
341     ucs_assert(iface->async.event_cb != NULL);
342     /* notify user */
343     iface->async.event_cb(iface->async.event_arg, 0);
344 }
345 
uct_ud_iface_timer(int timer_id,int events,void * arg)346 static void uct_ud_iface_timer(int timer_id, int events, void *arg)
347 {
348     uct_ud_iface_t *iface = arg;
349 
350     uct_ud_iface_async_progress(iface);
351 }
352 
uct_ud_iface_complete_init(uct_ud_iface_t * iface)353 ucs_status_t uct_ud_iface_complete_init(uct_ud_iface_t *iface)
354 {
355     ucs_async_context_t *async = iface->super.super.worker->async;
356     ucs_async_mode_t async_mode = async->mode;
357     ucs_status_t status;
358     int event_fd;
359 
360     status = ucs_twheel_init(&iface->tx.timer, iface->tx.tick / 4,
361                              uct_ud_iface_get_time(iface));
362     if (status != UCS_OK) {
363         goto err;
364     }
365 
366     status = uct_ib_iface_event_fd_get(&iface->super.super.super, &event_fd);
367     if (status != UCS_OK) {
368         goto err_twheel_cleanup;
369     }
370 
371     if (iface->async.event_cb != NULL) {
372         status = ucs_async_set_event_handler(async_mode, event_fd,
373                                              UCS_EVENT_SET_EVREAD |
374                                              UCS_EVENT_SET_EVERR,
375                                              uct_ud_iface_async_handler,
376                                              iface, async);
377         if (status != UCS_OK) {
378             goto err_twheel_cleanup;
379         }
380 
381         status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX, 1);
382         if (status != UCS_OK) {
383             goto err_twheel_cleanup;
384         }
385     }
386 
387     return UCS_OK;
388 
389 err_twheel_cleanup:
390     ucs_twheel_cleanup(&iface->tx.timer);
391 err:
392     return status;
393 }
394 
uct_ud_iface_remove_async_handlers(uct_ud_iface_t * iface)395 void uct_ud_iface_remove_async_handlers(uct_ud_iface_t *iface)
396 {
397     ucs_status_t status;
398     int event_fd;
399 
400     uct_ud_iface_progress_disable(&iface->super.super.super,
401                                   UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
402     if (iface->async.event_cb != NULL) {
403         status = uct_ib_iface_event_fd_get(&iface->super.super.super,
404                                            &event_fd);
405         if (status == UCS_OK) {
406             ucs_async_remove_handler(event_fd, 1);
407         }
408     }
409 }
410 
uct_ud_iface_gid_hash_init(uct_ud_iface_t * iface,uct_md_h md)411 static ucs_status_t uct_ud_iface_gid_hash_init(uct_ud_iface_t *iface,
412                                                uct_md_h md)
413 {
414     static const union ibv_gid zero_gid = { .raw = {0} };
415     uct_ib_device_t *dev                = &ucs_derived_of(md, uct_ib_md_t)->dev;
416     int port                            = iface->super.config.port_num;
417     uct_ib_device_gid_info_t gid_info;
418     int gid_idx, gid_tbl_len, kh_ret;
419     ucs_status_t status;
420     char gid_str[128];
421 
422     kh_init_inplace(uct_ud_iface_gid, &iface->gid_table.hash);
423 
424     gid_tbl_len = uct_ib_device_port_attr(dev, port)->gid_tbl_len;
425     for (gid_idx = 0; gid_idx < gid_tbl_len; ++gid_idx) {
426         status = uct_ib_device_query_gid_info(dev->ibv_context,
427                                               uct_ib_device_name(dev),
428                                               port, gid_idx, &gid_info);
429         if (status != UCS_OK) {
430             goto err;
431         }
432 
433         if (!memcmp(&gid_info.gid, &zero_gid, sizeof(zero_gid))) {
434             continue;
435         }
436 
437         ucs_debug("iface %p: adding gid %s to hash on device %s port %d index "
438                   "%d)", iface, uct_ib_gid_str(&gid_info.gid, gid_str,
439                                                 sizeof(gid_str)),
440                   uct_ib_device_name(dev), port, gid_idx);
441         kh_put(uct_ud_iface_gid, &iface->gid_table.hash, gid_info.gid,
442                &kh_ret);
443         if (kh_ret < 0) {
444             ucs_error("failed to add gid to hash on device %s port %d index %d",
445                       uct_ib_device_name(dev), port, gid_idx);
446             status = UCS_ERR_NO_MEMORY;
447             goto err;
448         }
449     }
450 
451     iface->gid_table.last     = zero_gid;
452     iface->gid_table.last_len = sizeof(zero_gid);
453     return UCS_OK;
454 
455 err:
456     kh_destroy_inplace(uct_ud_iface_gid, &iface->gid_table.hash);
457     return status;
458 }
459 
UCS_CLASS_INIT_FUNC(uct_ud_iface_t,uct_ud_iface_ops_t * ops,uct_md_h md,uct_worker_h worker,const uct_iface_params_t * params,const uct_ud_iface_config_t * config,uct_ib_iface_init_attr_t * init_attr)460 UCS_CLASS_INIT_FUNC(uct_ud_iface_t, uct_ud_iface_ops_t *ops, uct_md_h md,
461                     uct_worker_h worker, const uct_iface_params_t *params,
462                     const uct_ud_iface_config_t *config,
463                     uct_ib_iface_init_attr_t *init_attr)
464 {
465     ucs_status_t status;
466     size_t data_size;
467     int mtu;
468 
469     UCT_CHECK_PARAM(params->field_mask & UCT_IFACE_PARAM_FIELD_OPEN_MODE,
470                     "UCT_IFACE_PARAM_FIELD_OPEN_MODE is not defined");
471     if (!(params->open_mode & UCT_IFACE_OPEN_MODE_DEVICE)) {
472         ucs_error("only UCT_IFACE_OPEN_MODE_DEVICE is supported");
473         return UCS_ERR_UNSUPPORTED;
474     }
475 
476     ucs_trace_func("%s: iface=%p ops=%p worker=%p rx_headroom=%zu",
477                    params->mode.device.dev_name, self, ops, worker,
478                    (params->field_mask & UCT_IFACE_PARAM_FIELD_RX_HEADROOM) ?
479                    params->rx_headroom : 0);
480 
481     if (config->super.tx.queue_len <= UCT_UD_TX_MODERATION) {
482         ucs_error("%s ud iface tx queue is too short (%d <= %d)",
483                   params->mode.device.dev_name,
484                   config->super.tx.queue_len, UCT_UD_TX_MODERATION);
485         return UCS_ERR_INVALID_PARAM;
486     }
487 
488     status = uct_ib_device_mtu(params->mode.device.dev_name, md, &mtu);
489     if (status != UCS_OK) {
490         return status;
491     }
492 
493     init_attr->rx_priv_len = sizeof(uct_ud_recv_skb_t) -
494                              sizeof(uct_ib_iface_recv_desc_t);
495     init_attr->rx_hdr_len  = UCT_IB_GRH_LEN + sizeof(uct_ud_neth_t);
496     init_attr->seg_size    = ucs_min(mtu, config->super.seg_size);
497     init_attr->qp_type     = IBV_QPT_UD;
498 
499     UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker,
500                               params, &config->super, init_attr);
501 
502     if (self->super.super.worker->async == NULL) {
503         ucs_error("%s ud iface must have valid async context", params->mode.device.dev_name);
504         return UCS_ERR_INVALID_PARAM;
505     }
506 
507     self->tx.unsignaled          = 0;
508     self->tx.available           = config->super.tx.queue_len;
509     self->tx.timer_sweep_count   = 0;
510     self->async.disable          = 0;
511 
512     self->rx.available           = config->super.rx.queue_len;
513     self->rx.quota               = 0;
514     self->config.tx_qp_len       = config->super.tx.queue_len;
515     self->config.peer_timeout    = ucs_time_from_sec(config->peer_timeout);
516     self->config.check_grh_dgid  = config->dgid_check &&
517                                    uct_ib_iface_is_roce(&self->super);
518 
519     if ((config->max_window < UCT_UD_CA_MIN_WINDOW) ||
520         (config->max_window > UCT_UD_CA_MAX_WINDOW)) {
521         ucs_error("Max congestion avoidance window should be >= %d and <= %d (%d)",
522                   UCT_UD_CA_MIN_WINDOW, UCT_UD_CA_MAX_WINDOW, config->max_window);
523         return UCS_ERR_INVALID_PARAM;
524     }
525 
526     self->config.max_window = config->max_window;
527 
528     self->rx.async_max_poll = config->rx_async_max_poll;
529 
530     if (config->timer_tick <= 0.) {
531         ucs_error("The timer tick should be > 0 (%lf)",
532                   config->timer_tick);
533         return UCS_ERR_INVALID_PARAM;
534     } else {
535         self->tx.tick = ucs_time_from_sec(config->timer_tick);
536     }
537 
538     if (config->timer_backoff < UCT_UD_MIN_TIMER_TIMER_BACKOFF) {
539         ucs_error("The timer back off must be >= %lf (%lf)",
540                   UCT_UD_MIN_TIMER_TIMER_BACKOFF, config->timer_backoff);
541         return UCS_ERR_INVALID_PARAM;
542     } else {
543         self->tx.timer_backoff = config->timer_backoff;
544     }
545 
546     if (config->event_timer_tick <= 0.) {
547         ucs_error("The event timer tick should be > 0 (%lf)",
548                   config->event_timer_tick);
549         return UCS_ERR_INVALID_PARAM;
550     } else {
551         self->async.tick = ucs_time_from_sec(config->event_timer_tick);
552     }
553 
554     if (params->field_mask & UCT_IFACE_PARAM_FIELD_ASYNC_EVENT_CB) {
555         self->async.event_cb = params->async_event_cb;
556     } else {
557         self->async.event_cb = NULL;
558     }
559 
560     if (params->field_mask & UCT_IFACE_PARAM_FIELD_ASYNC_EVENT_ARG) {
561         self->async.event_arg = params->async_event_arg;
562     } else {
563         self->async.event_arg = NULL;
564     }
565 
566     self->async.timer_id = 0;
567 
568     /* Redefine receive desc release callback */
569     self->super.release_desc.cb = uct_ud_iface_release_desc;
570 
571     UCT_UD_IFACE_HOOK_INIT(self);
572 
573     status = uct_ud_iface_create_qp(self, config);
574     if (status != UCS_OK) {
575         return UCS_ERR_INVALID_PARAM;
576     }
577 
578     ucs_ptr_array_init(&self->eps, "ud_eps");
579     uct_ud_iface_cep_init(self);
580 
581     status = uct_ib_iface_recv_mpool_init(&self->super, &config->super,
582                                           "ud_recv_skb", &self->rx.mp);
583     if (status != UCS_OK) {
584         goto err_qp;
585     }
586 
587     self->rx.available = ucs_min(config->ud_common.rx_queue_len_init,
588                                  config->super.rx.queue_len);
589     self->rx.quota     = config->super.rx.queue_len - self->rx.available;
590     ucs_mpool_grow(&self->rx.mp, self->rx.available);
591 
592     data_size = sizeof(uct_ud_ctl_hdr_t) + self->super.addr_size;
593     data_size = ucs_max(data_size, self->super.config.seg_size);
594     data_size = ucs_max(data_size,
595                         sizeof(uct_ud_zcopy_desc_t) + self->config.max_inline);
596     data_size = ucs_max(data_size,
597                         sizeof(uct_ud_ctl_desc_t) + sizeof(uct_ud_neth_t));
598     status = uct_iface_mpool_init(&self->super.super, &self->tx.mp,
599                                   sizeof(uct_ud_send_skb_t) + data_size,
600                                   sizeof(uct_ud_send_skb_t),
601                                   UCT_UD_SKB_ALIGN,
602                                   &config->super.tx.mp, self->config.tx_qp_len,
603                                   uct_ud_iface_send_skb_init, "ud_tx_skb");
604     if (status != UCS_OK) {
605         goto err_rx_mpool;
606     }
607 
608     self->tx.skb                  = NULL;
609     self->tx.async_before_pending = 0;
610 
611     ucs_arbiter_init(&self->tx.pending_q);
612     ucs_queue_head_init(&self->tx.outstanding_q);
613     ucs_queue_head_init(&self->tx.async_comp_q);
614     ucs_queue_head_init(&self->rx.pending_q);
615 
616     status = UCS_STATS_NODE_ALLOC(&self->stats, &uct_ud_iface_stats_class,
617                                   self->super.super.stats);
618     if (status != UCS_OK) {
619         goto err_tx_mpool;
620     }
621 
622     status = uct_ud_iface_gid_hash_init(self, md);
623     if (status != UCS_OK) {
624         goto err_release_stats;
625     }
626 
627     return UCS_OK;
628 
629 err_release_stats:
630     UCS_STATS_NODE_FREE(self->stats);
631 err_tx_mpool:
632     ucs_mpool_cleanup(&self->tx.mp, 1);
633 err_rx_mpool:
634     ucs_mpool_cleanup(&self->rx.mp, 1);
635 err_qp:
636     uct_ib_destroy_qp(self->qp);
637     ucs_ptr_array_cleanup(&self->eps);
638     return status;
639 }
640 
UCS_CLASS_CLEANUP_FUNC(uct_ud_iface_t)641 static UCS_CLASS_CLEANUP_FUNC(uct_ud_iface_t)
642 {
643     ucs_trace_func("");
644 
645     /* TODO: proper flush and connection termination */
646     uct_ud_enter(self);
647     ucs_twheel_cleanup(&self->tx.timer);
648     ucs_debug("iface(%p): cep cleanup", self);
649     uct_ud_iface_cep_cleanup(self);
650     uct_ud_iface_free_async_comps(self);
651     ucs_mpool_cleanup(&self->tx.mp, 0);
652     /* TODO: qp to error state and cleanup all wqes */
653     uct_ud_iface_free_pending_rx(self);
654     ucs_mpool_cleanup(&self->rx.mp, 0);
655     uct_ib_destroy_qp(self->qp);
656     ucs_debug("iface(%p): ptr_array cleanup", self);
657     ucs_ptr_array_cleanup(&self->eps);
658     ucs_arbiter_cleanup(&self->tx.pending_q);
659     UCS_STATS_NODE_FREE(self->stats);
660     kh_destroy_inplace(uct_ud_iface_gid, &self->gid_table.hash);
661     uct_ud_leave(self);
662 }
663 
664 UCS_CLASS_DEFINE(uct_ud_iface_t, uct_ib_iface_t);
665 
666 ucs_config_field_t uct_ud_iface_config_table[] = {
667     {UCT_IB_CONFIG_PREFIX, "", NULL,
668      ucs_offsetof(uct_ud_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)},
669 
670     {"UD_", "", NULL,
671      ucs_offsetof(uct_ud_iface_config_t, ud_common),
672      UCS_CONFIG_TYPE_TABLE(uct_ud_iface_common_config_table)},
673 
674     {"TIMEOUT", "5.0m", "Transport timeout",
675      ucs_offsetof(uct_ud_iface_config_t, peer_timeout), UCS_CONFIG_TYPE_TIME},
676     {"TIMER_TICK", "10ms", "Initial timeout for retransmissions",
677      ucs_offsetof(uct_ud_iface_config_t, timer_tick), UCS_CONFIG_TYPE_TIME},
678     {"TIMER_BACKOFF", "2.0",
679      "Timeout multiplier for resending trigger (must be >= "
680      UCS_PP_MAKE_STRING(UCT_UD_MIN_TIMER_TIMER_BACKOFF) ")",
681      ucs_offsetof(uct_ud_iface_config_t, timer_backoff),
682                   UCS_CONFIG_TYPE_DOUBLE},
683     {"ASYNC_TIMER_TICK", "100ms", "Resolution for async timer",
684      ucs_offsetof(uct_ud_iface_config_t, event_timer_tick), UCS_CONFIG_TYPE_TIME},
685     {"ETH_DGID_CHECK", "y",
686      "Enable checking destination GID for incoming packets of Ethernet network.\n"
687      "Mismatched packets are silently dropped.",
688      ucs_offsetof(uct_ud_iface_config_t, dgid_check), UCS_CONFIG_TYPE_BOOL},
689 
690     {"MAX_WINDOW", UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW),
691      "Max congestion avoidance window. Should be >= "
692       UCS_PP_MAKE_STRING(UCT_UD_CA_MIN_WINDOW) " and <= "
693       UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW),
694      ucs_offsetof(uct_ud_iface_config_t, max_window), UCS_CONFIG_TYPE_UINT},
695 
696     {"RX_ASYNC_MAX_POLL", "64",
697      "Max number of receive completions to pick during asynchronous TX poll",
698      ucs_offsetof(uct_ud_iface_config_t, rx_async_max_poll), UCS_CONFIG_TYPE_UINT},
699 
700     {NULL}
701 };
702 
703 
uct_ud_iface_query(uct_ud_iface_t * iface,uct_iface_attr_t * iface_attr,size_t am_max_iov,size_t am_max_hdr)704 ucs_status_t uct_ud_iface_query(uct_ud_iface_t *iface,
705                                 uct_iface_attr_t *iface_attr,
706                                 size_t am_max_iov, size_t am_max_hdr)
707 {
708     ucs_status_t status;
709 
710     status = uct_ib_iface_query(&iface->super,
711                                 UCT_IB_DETH_LEN + sizeof(uct_ud_neth_t),
712                                 iface_attr);
713     if (status != UCS_OK) {
714         return status;
715     }
716 
717     iface_attr->cap.flags              = UCT_IFACE_FLAG_AM_BCOPY         |
718                                          UCT_IFACE_FLAG_AM_ZCOPY         |
719                                          UCT_IFACE_FLAG_CONNECT_TO_EP    |
720                                          UCT_IFACE_FLAG_CONNECT_TO_IFACE |
721                                          UCT_IFACE_FLAG_PENDING          |
722                                          UCT_IFACE_FLAG_CB_SYNC          |
723                                          UCT_IFACE_FLAG_CB_ASYNC         |
724                                          UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE;
725     iface_attr->cap.event_flags        = UCT_IFACE_FLAG_EVENT_SEND_COMP |
726                                          UCT_IFACE_FLAG_EVENT_RECV      |
727                                          UCT_IFACE_FLAG_EVENT_ASYNC_CB;
728 
729     iface_attr->cap.am.max_short       = uct_ib_iface_hdr_size(iface->config.max_inline,
730                                                                sizeof(uct_ud_neth_t));
731     iface_attr->cap.am.max_bcopy       = iface->super.config.seg_size - sizeof(uct_ud_neth_t);
732     iface_attr->cap.am.min_zcopy       = 0;
733     iface_attr->cap.am.max_zcopy       = iface->super.config.seg_size - sizeof(uct_ud_neth_t);
734     iface_attr->cap.am.align_mtu       = uct_ib_mtu_value(uct_ib_iface_port_attr(&iface->super)->active_mtu);
735     iface_attr->cap.am.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
736     iface_attr->cap.am.max_iov         = am_max_iov;
737     iface_attr->cap.am.max_hdr         = am_max_hdr;
738 
739     iface_attr->cap.put.max_short      = uct_ib_iface_hdr_size(iface->config.max_inline,
740                                                                sizeof(uct_ud_neth_t) +
741                                                                sizeof(uct_ud_put_hdr_t));
742 
743     iface_attr->iface_addr_len         = sizeof(uct_ud_iface_addr_t);
744     iface_attr->ep_addr_len            = sizeof(uct_ud_ep_addr_t);
745     iface_attr->max_conn_priv          = 0;
746 
747     /* UD lacks of scatter to CQE support */
748     iface_attr->latency.c             += 30e-9;
749 
750     if (iface_attr->cap.am.max_short) {
751         iface_attr->cap.flags |= UCT_IFACE_FLAG_AM_SHORT;
752     }
753 
754     return UCS_OK;
755 }
756 
757 ucs_status_t
uct_ud_iface_get_address(uct_iface_h tl_iface,uct_iface_addr_t * iface_addr)758 uct_ud_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr)
759 {
760     uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
761     uct_ud_iface_addr_t *addr = (uct_ud_iface_addr_t *)iface_addr;
762 
763     uct_ib_pack_uint24(addr->qp_num, iface->qp->qp_num);
764 
765     return UCS_OK;
766 }
767 
uct_ud_iface_flush(uct_iface_h tl_iface,unsigned flags,uct_completion_t * comp)768 ucs_status_t uct_ud_iface_flush(uct_iface_h tl_iface, unsigned flags,
769                                 uct_completion_t *comp)
770 {
771     uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
772     uct_ud_ep_t *ep;
773     ucs_status_t status;
774     int i, count;
775 
776     ucs_trace_func("");
777 
778     if (comp != NULL) {
779         return UCS_ERR_UNSUPPORTED;
780     }
781 
782     uct_ud_enter(iface);
783 
784     if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface) ||
785                      !ucs_queue_is_empty(&iface->tx.outstanding_q))) {
786         UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super);
787         uct_ud_leave(iface);
788         return UCS_INPROGRESS;
789     }
790 
791     count = 0;
792     ucs_ptr_array_for_each(ep, i, &iface->eps) {
793         /* ud ep flush returns either ok or in progress */
794         status = uct_ud_ep_flush_nolock(iface, ep, NULL);
795         if ((status == UCS_INPROGRESS) || (status == UCS_ERR_NO_RESOURCE)) {
796             ++count;
797         }
798     }
799 
800     uct_ud_leave(iface);
801     if (count != 0) {
802         UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super);
803         return UCS_INPROGRESS;
804     }
805 
806     UCT_TL_IFACE_STAT_FLUSH(&iface->super.super);
807     return UCS_OK;
808 }
809 
uct_ud_iface_add_ep(uct_ud_iface_t * iface,uct_ud_ep_t * ep)810 void uct_ud_iface_add_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
811 {
812     ep->ep_id = ucs_ptr_array_insert(&iface->eps, ep);
813 }
814 
uct_ud_iface_remove_ep(uct_ud_iface_t * iface,uct_ud_ep_t * ep)815 void uct_ud_iface_remove_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
816 {
817     if (ep->ep_id != UCT_UD_EP_NULL_ID) {
818         ucs_trace("iface(%p) remove ep: %p id %d", iface, ep, ep->ep_id);
819         ucs_ptr_array_remove(&iface->eps, ep->ep_id);
820     }
821 }
822 
uct_ud_iface_replace_ep(uct_ud_iface_t * iface,uct_ud_ep_t * old_ep,uct_ud_ep_t * new_ep)823 void uct_ud_iface_replace_ep(uct_ud_iface_t *iface,
824                              uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep)
825 {
826     void *p;
827     ucs_assert_always(old_ep != new_ep);
828     ucs_assert_always(old_ep->ep_id != new_ep->ep_id);
829     p = ucs_ptr_array_replace(&iface->eps, old_ep->ep_id, new_ep);
830     ucs_assert_always(p == (void *)old_ep);
831     ucs_trace("replace_ep: old(%p) id=%d new(%p) id=%d", old_ep, old_ep->ep_id, new_ep, new_ep->ep_id);
832     ucs_ptr_array_remove(&iface->eps, new_ep->ep_id);
833 }
834 
uct_ud_iface_ctl_skb_get(uct_ud_iface_t * iface)835 uct_ud_send_skb_t *uct_ud_iface_ctl_skb_get(uct_ud_iface_t *iface)
836 {
837     uct_ud_send_skb_t *skb;
838 
839     /* grow reserved skb's queue on-demand */
840     skb = ucs_mpool_get(&iface->tx.mp);
841     if (skb == NULL) {
842         ucs_fatal("failed to allocate control skb");
843     }
844 
845     VALGRIND_MAKE_MEM_DEFINED(&skb->lkey, sizeof(skb->lkey));
846     skb->flags = 0;
847     return skb;
848 }
849 
uct_ud_iface_dispatch_async_comps_do(uct_ud_iface_t * iface)850 void uct_ud_iface_dispatch_async_comps_do(uct_ud_iface_t *iface)
851 {
852     uct_ud_comp_desc_t *cdesc;
853     uct_ud_send_skb_t *skb;
854 
855     ucs_queue_for_each_extract(skb, &iface->tx.async_comp_q, queue, 1) {
856         ucs_assert(!(skb->flags & UCT_UD_SEND_SKB_FLAG_RESENDING));
857         cdesc = uct_ud_comp_desc(skb);
858         uct_ud_iface_dispatch_comp(iface, cdesc->comp, cdesc->status);
859         uct_ud_skb_release(skb, 0);
860     }
861 }
862 
uct_ud_iface_free_async_comps(uct_ud_iface_t * iface)863 static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface)
864 {
865     uct_ud_send_skb_t *skb;
866 
867     ucs_queue_for_each_extract(skb, &iface->tx.async_comp_q, queue, 1) {
868         uct_ud_skb_release(skb, 0);
869     }
870 }
871 
uct_ud_iface_dispatch_pending_rx_do(uct_ud_iface_t * iface)872 ucs_status_t uct_ud_iface_dispatch_pending_rx_do(uct_ud_iface_t *iface)
873 {
874     int count;
875     uct_ud_recv_skb_t *skb;
876     uct_ud_neth_t *neth;
877     unsigned max_poll = iface->super.config.rx_max_poll;
878 
879     count = 0;
880     do {
881         skb = ucs_queue_pull_elem_non_empty(&iface->rx.pending_q, uct_ud_recv_skb_t, u.am.queue);
882         neth =  (uct_ud_neth_t *)((char *)uct_ib_iface_recv_desc_hdr(&iface->super,
883                                                                      (uct_ib_iface_recv_desc_t *)skb) +
884                                   UCT_IB_GRH_LEN);
885         uct_ib_iface_invoke_am_desc(&iface->super,
886                                     uct_ud_neth_get_am_id(neth),
887                                     neth + 1,
888                                     skb->u.am.len,
889                                     &skb->super);
890         count++;
891         if (count >= max_poll) {
892             return UCS_ERR_NO_RESOURCE;
893         }
894     } while (!ucs_queue_is_empty(&iface->rx.pending_q));
895 
896     return UCS_OK;
897 }
898 
uct_ud_iface_free_pending_rx(uct_ud_iface_t * iface)899 static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface)
900 {
901     uct_ud_recv_skb_t *skb;
902 
903     while (!ucs_queue_is_empty(&iface->rx.pending_q)) {
904         skb = ucs_queue_pull_elem_non_empty(&iface->rx.pending_q, uct_ud_recv_skb_t, u.am.queue);
905         ucs_mpool_put(skb);
906     }
907 }
908 
uct_ud_iface_release_desc(uct_recv_desc_t * self,void * desc)909 void uct_ud_iface_release_desc(uct_recv_desc_t *self, void *desc)
910 {
911     uct_ud_iface_t *iface = ucs_container_of(self,
912                                              uct_ud_iface_t, super.release_desc);
913 
914     uct_ud_enter(iface);
915     uct_ib_iface_release_desc(self, desc);
916     uct_ud_leave(iface);
917 }
918 
uct_ud_iface_event_arm(uct_iface_h tl_iface,unsigned events)919 ucs_status_t uct_ud_iface_event_arm(uct_iface_h tl_iface, unsigned events)
920 {
921     uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
922     ucs_status_t status;
923 
924     uct_ud_enter(iface);
925 
926     status = uct_ib_iface_pre_arm(&iface->super);
927     if (status != UCS_OK) {
928         goto out;
929     }
930 
931     /* Check if some receives were not delivered yet */
932     if ((events & (UCT_EVENT_RECV | UCT_EVENT_RECV_SIG)) &&
933         !ucs_queue_is_empty(&iface->rx.pending_q))
934     {
935         status = UCS_ERR_BUSY;
936         goto out;
937     }
938 
939     /* Check if some send completions were not delivered yet */
940     if ((events & UCT_EVENT_SEND_COMP) &&
941         !ucs_queue_is_empty(&iface->tx.async_comp_q))
942     {
943         status = UCS_ERR_BUSY;
944         goto out;
945     }
946 
947     if (events & UCT_EVENT_SEND_COMP) {
948         status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_TX, 0);
949         if (status != UCS_OK) {
950             goto out;
951         }
952     }
953 
954     if (events & (UCT_EVENT_SEND_COMP | UCT_EVENT_RECV)) {
955         /* we may get send completion through ACKs as well */
956         status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX, 0);
957         if (status != UCS_OK) {
958             goto out;
959         }
960     }
961 
962     status = UCS_OK;
963 out:
964     uct_ud_leave(iface);
965     return status;
966 }
967 
uct_ud_iface_progress_enable(uct_iface_h tl_iface,unsigned flags)968 void uct_ud_iface_progress_enable(uct_iface_h tl_iface, unsigned flags)
969 {
970     uct_ud_iface_t *iface       = ucs_derived_of(tl_iface, uct_ud_iface_t);
971     ucs_async_context_t *async  = iface->super.super.worker->async;
972     ucs_async_mode_t async_mode = async->mode;
973     ucs_status_t status;
974 
975     uct_ud_enter(iface);
976 
977     if (flags & UCT_PROGRESS_RECV) {
978         iface->rx.available += iface->rx.quota;
979         iface->rx.quota      = 0;
980         /* let progress (possibly async) post the missing receives */
981     }
982 
983     if (iface->async.timer_id == 0) {
984         status = ucs_async_add_timer(async_mode, iface->async.tick,
985                                      uct_ud_iface_timer, iface, async,
986                                      &iface->async.timer_id);
987         if (status != UCS_OK) {
988             ucs_fatal("iface(%p): unable to add iface timer handler - %s",
989                       iface, ucs_status_string(status));
990         }
991         ucs_assert(iface->async.timer_id != 0);
992     }
993 
994     uct_ud_leave(iface);
995 
996     uct_base_iface_progress_enable(tl_iface, flags);
997 }
998 
uct_ud_iface_progress_disable(uct_iface_h tl_iface,unsigned flags)999 void uct_ud_iface_progress_disable(uct_iface_h tl_iface, unsigned flags)
1000 {
1001     uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
1002     ucs_status_t status;
1003 
1004     uct_ud_enter(iface);
1005 
1006     if (iface->async.timer_id != 0) {
1007         status = ucs_async_remove_handler(iface->async.timer_id, 1);
1008         if (status != UCS_OK) {
1009             ucs_fatal("iface(%p): unable to remove iface timer handler (%d) - %s",
1010                       iface, iface->async.timer_id, ucs_status_string(status));
1011         }
1012         iface->async.timer_id = 0;
1013     }
1014 
1015     uct_ud_leave(iface);
1016 
1017     uct_base_iface_progress_disable(tl_iface, flags);
1018 }
1019 
uct_ud_iface_ctl_skb_complete(uct_ud_iface_t * iface,uct_ud_ctl_desc_t * cdesc,int is_async)1020 void uct_ud_iface_ctl_skb_complete(uct_ud_iface_t *iface,
1021                                    uct_ud_ctl_desc_t *cdesc, int is_async)
1022 {
1023     uct_ud_send_skb_t *resent_skb, *skb;
1024 
1025     skb = cdesc->self_skb;
1026     ucs_assert(!(skb->flags & UCT_UD_SEND_SKB_FLAG_INVALID));
1027 
1028     resent_skb = cdesc->resent_skb;
1029     ucs_assert(uct_ud_ctl_desc(skb) == cdesc);
1030 
1031     if (resent_skb != NULL) {
1032         ucs_assert(skb->flags        & UCT_UD_SEND_SKB_FLAG_CTL_RESEND);
1033         ucs_assert(resent_skb->flags & UCT_UD_SEND_SKB_FLAG_RESENDING);
1034 
1035         resent_skb->flags &= ~UCT_UD_SEND_SKB_FLAG_RESENDING;
1036         --cdesc->ep->tx.resend_count;
1037 
1038         uct_ud_ep_window_release_completed(cdesc->ep, is_async);
1039     } else {
1040         ucs_assert(skb->flags & UCT_UD_SEND_SKB_FLAG_CTL_ACK);
1041     }
1042 
1043     uct_ud_skb_release(skb, 0);
1044 
1045 }
1046 
uct_ud_iface_send_completion(uct_ud_iface_t * iface,uint16_t sn,int is_async)1047 void uct_ud_iface_send_completion(uct_ud_iface_t *iface, uint16_t sn,
1048                                   int is_async)
1049 {
1050     uct_ud_ctl_desc_t *cdesc;
1051 
1052     ucs_queue_for_each_extract(cdesc, &iface->tx.outstanding_q, queue,
1053                                UCS_CIRCULAR_COMPARE16(cdesc->sn, <=, sn)) {
1054         uct_ud_iface_ctl_skb_complete(iface, cdesc, is_async);
1055     }
1056 }
1057 
uct_ud_grh_get_dgid(struct ibv_grh * grh,size_t dgid_len)1058 union ibv_gid* uct_ud_grh_get_dgid(struct ibv_grh *grh, size_t dgid_len)
1059 {
1060     size_t i;
1061 
1062     /* Make sure that daddr in IPv4 resides in the last 4 bytes in GRH */
1063     UCS_STATIC_ASSERT((UCT_IB_GRH_LEN - (20 + offsetof(struct iphdr, daddr))) ==
1064                       UCS_IPV4_ADDR_LEN);
1065 
1066     /* Make sure that dgid resides in the last 16 bytes in GRH */
1067     UCS_STATIC_ASSERT((UCT_IB_GRH_LEN - offsetof(struct ibv_grh, dgid)) ==
1068                       UCS_IPV6_ADDR_LEN);
1069 
1070     ucs_assert((dgid_len == UCS_IPV4_ADDR_LEN) ||
1071                (dgid_len == UCS_IPV6_ADDR_LEN));
1072 
1073     /*
1074     * According to Annex17_RoCEv2 (A17.4.5.2):
1075     * "The first 40 bytes of user posted UD Receive Buffers are reserved for the L3
1076     * header of the incoming packet (as per the InfiniBand Spec Section 11.4.1.2).
1077     * In RoCEv2, this area is filled up with the IP header. IPv6 header uses the
1078     * entire 40 bytes. IPv4 headers use the 20 bytes in the second half of the
1079     * reserved 40 bytes area (i.e. offset 20 from the beginning of the receive
1080     * buffer). In this case, the content of the first 20 bytes is undefined. "
1081     */
1082     if (dgid_len == UCS_IPV4_ADDR_LEN) {
1083         /* IPv4 mapped to IPv6 looks like: 0000:0000:0000:0000:0000:ffff:????:????
1084            reset begin to make hash function working */
1085         for (i = 0; i < (sizeof(union ibv_gid) - UCS_IPV4_ADDR_LEN - 2);) {
1086             grh->dgid.raw[i++] = 0x00;
1087         }
1088 
1089         grh->dgid.raw[i++]     = 0xff;
1090         grh->dgid.raw[i++]     = 0xff;
1091     }
1092 
1093     return &grh->dgid;
1094 }
1095