1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 static UCS_F_ALWAYS_INLINE void
uct_ud_ep_ctl_op_schedule(uct_ud_iface_t * iface,uct_ud_ep_t * ep)8 uct_ud_ep_ctl_op_schedule(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
9 {
10     ucs_arbiter_group_push_elem(&ep->tx.pending.group,
11                                 &ep->tx.pending.elem);
12     ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);
13 }
14 
15 /**
16  * schedule control operation.
17  */
18 static UCS_F_ALWAYS_INLINE void
uct_ud_ep_ctl_op_add(uct_ud_iface_t * iface,uct_ud_ep_t * ep,int op)19 uct_ud_ep_ctl_op_add(uct_ud_iface_t *iface, uct_ud_ep_t *ep, int op)
20 {
21     ep->tx.pending.ops |= op;
22     uct_ud_ep_ctl_op_schedule(iface, ep);
23 }
24 
25 static UCS_F_ALWAYS_INLINE void
uct_ud_ep_tx_stop(uct_ud_ep_t * ep)26 uct_ud_ep_tx_stop(uct_ud_ep_t *ep)
27 {
28     ep->tx.max_psn = ep->tx.psn;
29 }
30 
31 /*
32  * check iface resources:tx_queue and return
33  * prefetched/cached skb
34  *
35  * NOTE: caller must not return skb to mpool until it is
36  * removed from the cache
37  * skb is removed from cache by
38  *  uct_ud_iface_complete_tx_inl()
39  *  uct_ud_iface_complete_tx_skb()
40  *
41  * In case of error flow caller must do nothing with the skb
42  */
43 static UCS_F_ALWAYS_INLINE
uct_ud_iface_get_tx_skb(uct_ud_iface_t * iface,uct_ud_ep_t * ep)44 uct_ud_send_skb_t *uct_ud_iface_get_tx_skb(uct_ud_iface_t *iface,
45                                            uct_ud_ep_t *ep)
46 {
47     uct_ud_send_skb_t *skb;
48 
49     if (ucs_unlikely(!uct_ud_iface_can_tx(iface))) {
50         UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
51         return NULL;
52     }
53 
54     skb = iface->tx.skb;
55     if (ucs_unlikely(skb == NULL)) {
56         skb = ucs_mpool_get(&iface->tx.mp);
57         if (skb == NULL) {
58             ucs_trace_data("iface=%p out of tx skbs", iface);
59             UCT_TL_IFACE_STAT_TX_NO_DESC(&iface->super.super);
60             return NULL;
61         }
62         iface->tx.skb = skb;
63     }
64     VALGRIND_MAKE_MEM_DEFINED(&skb->lkey, sizeof(skb->lkey));
65     skb->flags = 0;
66     ucs_prefetch(skb->neth);
67     return skb;
68 }
69 
70 static UCS_F_ALWAYS_INLINE void
uct_ud_skb_release(uct_ud_send_skb_t * skb,int is_inline)71 uct_ud_skb_release(uct_ud_send_skb_t *skb, int is_inline)
72 {
73     ucs_assert(!(skb->flags & UCT_UD_SEND_SKB_FLAG_INVALID));
74     skb->flags = UCT_UD_SEND_SKB_FLAG_INVALID;
75     if (is_inline) {
76         ucs_mpool_put_inline(skb);
77     } else {
78         ucs_mpool_put(skb);
79     }
80 }
81 
82 #if UCS_ENABLE_ASSERT
uct_ud_ep_has_pending(uct_ud_ep_t * ep)83 static UCS_F_ALWAYS_INLINE int uct_ud_ep_has_pending(uct_ud_ep_t *ep)
84 {
85     return !ucs_arbiter_group_is_empty(&ep->tx.pending.group) &&
86            !ucs_arbiter_elem_is_only(&ep->tx.pending.elem);
87 }
88 #endif
89 
uct_ud_ep_set_has_pending_flag(uct_ud_ep_t * ep)90 static UCS_F_ALWAYS_INLINE void uct_ud_ep_set_has_pending_flag(uct_ud_ep_t *ep)
91 {
92     ep->flags |= UCT_UD_EP_FLAG_HAS_PENDING;
93 }
94 
uct_ud_ep_remove_has_pending_flag(uct_ud_ep_t * ep)95 static UCS_F_ALWAYS_INLINE void uct_ud_ep_remove_has_pending_flag(uct_ud_ep_t *ep)
96 {
97     ucs_assert(ep->flags & UCT_UD_EP_FLAG_HAS_PENDING);
98     ep->flags &= ~UCT_UD_EP_FLAG_HAS_PENDING;
99 }
100 
uct_ud_ep_set_dest_ep_id(uct_ud_ep_t * ep,uint32_t dest_id)101 static UCS_F_ALWAYS_INLINE void uct_ud_ep_set_dest_ep_id(uct_ud_ep_t *ep,
102                                                          uint32_t dest_id)
103 {
104     ucs_assert(dest_id != UCT_UD_EP_NULL_ID);
105     ep->dest_ep_id = dest_id;
106     ep->flags     |= UCT_UD_EP_FLAG_CONNECTED;
107 }
108 
109 /* same as above but also check ep resources: window&connection state */
110 static UCS_F_ALWAYS_INLINE uct_ud_send_skb_t *
uct_ud_ep_get_tx_skb(uct_ud_iface_t * iface,uct_ud_ep_t * ep)111 uct_ud_ep_get_tx_skb(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
112 {
113     if (ucs_unlikely(!uct_ud_ep_is_connected_and_no_pending(ep) ||
114                      uct_ud_ep_no_window(ep) ||
115                      uct_ud_iface_has_pending_async_ev(iface))) {
116         ucs_trace_poll("iface=%p ep=%p (%d->%d) no ep resources (psn=%u max_psn=%u)",
117                        iface, ep, ep->ep_id, ep->dest_ep_id,
118                        (unsigned)ep->tx.psn,
119                        (unsigned)ep->tx.max_psn);
120         UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
121         return NULL;
122     }
123 
124     return uct_ud_iface_get_tx_skb(iface, ep);
125 }
126 
127 static UCS_F_ALWAYS_INLINE void
uct_ud_skb_set_zcopy_desc(uct_ud_send_skb_t * skb,const uct_iov_t * iov,size_t iovcnt,uct_completion_t * comp)128 uct_ud_skb_set_zcopy_desc(uct_ud_send_skb_t *skb, const uct_iov_t *iov,
129                           size_t iovcnt, uct_completion_t *comp)
130 {
131     uct_ud_zcopy_desc_t *zdesc;
132     size_t iov_it_length;
133     uct_ud_iov_t *ud_iov;
134     size_t iov_it;
135 
136     skb->flags        |= UCT_UD_SEND_SKB_FLAG_ZCOPY;
137     zdesc              = uct_ud_zcopy_desc(skb);
138     zdesc->iovcnt      = 0;
139     for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
140         iov_it_length = uct_iov_get_length(iov + iov_it);
141         if (iov_it_length == 0) {
142             continue;
143         }
144 
145         ucs_assert(iov_it_length <= UINT16_MAX);
146         ud_iov         = &zdesc->iov[zdesc->iovcnt++];
147         ud_iov->buffer = iov[iov_it].buffer;
148         ud_iov->lkey   = uct_ib_memh_get_lkey(iov[iov_it].memh);
149         ud_iov->length = iov_it_length;
150     }
151     if (comp != NULL) {
152         skb->flags        |= UCT_UD_SEND_SKB_FLAG_COMP;
153         zdesc->super.comp  = comp;
154     }
155 }
156 
157 static UCS_F_ALWAYS_INLINE void
uct_ud_iface_complete_tx(uct_ud_iface_t * iface,uct_ud_ep_t * ep,uct_ud_send_skb_t * skb,int has_data,void * data,const void * buffer,unsigned length)158 uct_ud_iface_complete_tx(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
159                          uct_ud_send_skb_t *skb, int has_data, void *data,
160                          const void *buffer, unsigned length)
161 {
162     ucs_time_t now = uct_ud_iface_get_time(iface);
163     iface->tx.skb  = ucs_mpool_get(&iface->tx.mp);
164     ep->tx.psn++;
165 
166     if (has_data) {
167         skb->len += length;
168         memcpy(data, buffer, length);
169     }
170 
171     ucs_queue_push(&ep->tx.window, &skb->queue);
172     ep->tx.tick = iface->tx.tick;
173 
174     if (!iface->async.disable) {
175         ucs_wtimer_add(&iface->tx.timer, &ep->timer,
176                        now - ucs_twheel_get_time(&iface->tx.timer) + ep->tx.tick);
177     }
178 
179     ep->tx.send_time = now;
180 }
181 
182 static UCS_F_ALWAYS_INLINE void
uct_ud_iface_complete_tx_inl(uct_ud_iface_t * iface,uct_ud_ep_t * ep,uct_ud_send_skb_t * skb,void * data,const void * buffer,unsigned length)183 uct_ud_iface_complete_tx_inl(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
184                              uct_ud_send_skb_t *skb, void *data,
185                              const void *buffer, unsigned length)
186 {
187     uct_ud_iface_complete_tx(iface, ep, skb, 1, data, buffer, length);
188 }
189 
190 static UCS_F_ALWAYS_INLINE void
uct_ud_iface_complete_tx_skb(uct_ud_iface_t * iface,uct_ud_ep_t * ep,uct_ud_send_skb_t * skb)191 uct_ud_iface_complete_tx_skb(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
192                              uct_ud_send_skb_t *skb)
193 {
194     uct_ud_iface_complete_tx(iface, ep, skb, 0, NULL, NULL, 0);
195 }
196 
197 static UCS_F_ALWAYS_INLINE ucs_status_t
uct_ud_am_skb_common(uct_ud_iface_t * iface,uct_ud_ep_t * ep,uint8_t id,uct_ud_send_skb_t ** skb_p)198 uct_ud_am_skb_common(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uint8_t id,
199                      uct_ud_send_skb_t **skb_p)
200 {
201     uct_ud_send_skb_t *skb;
202     uct_ud_neth_t *neth;
203 
204     UCT_CHECK_AM_ID(id);
205 
206     skb = uct_ud_ep_get_tx_skb(iface, ep);
207     if (!skb) {
208         return UCS_ERR_NO_RESOURCE;
209     }
210 
211     /* either we are executing pending operations, or there are no any pending
212      * elements, or the only pending element is for sending control messages
213      * (we don't care about reordering with respect to control messages)
214      */
215     ucs_assertv((ep->flags & UCT_UD_EP_FLAG_IN_PENDING) ||
216                 !uct_ud_ep_has_pending(ep),
217                 "out-of-order send detected for ep %p am %d ep_pending %d arbelem %p",
218                 ep, id, (ep->flags & UCT_UD_EP_FLAG_IN_PENDING),
219                 &ep->tx.pending.elem);
220 
221     neth = skb->neth;
222     uct_ud_neth_init_data(ep, neth);
223     uct_ud_neth_set_type_am(ep, neth, id);
224     uct_ud_neth_ack_req(ep, neth);
225 
226     *skb_p = skb;
227     return UCS_OK;
228 }
229 
230 static UCS_F_ALWAYS_INLINE size_t
uct_ud_skb_bcopy(uct_ud_send_skb_t * skb,uct_pack_callback_t pack_cb,void * arg)231 uct_ud_skb_bcopy(uct_ud_send_skb_t *skb, uct_pack_callback_t pack_cb, void *arg)
232 {
233     size_t payload_len;
234 
235     payload_len = pack_cb(skb->neth + 1, arg);
236     skb->len = sizeof(skb->neth[0]) + payload_len;
237     return payload_len;
238 }
239 
240 static UCS_F_ALWAYS_INLINE void
uct_ud_iface_dispatch_comp(uct_ud_iface_t * iface,uct_completion_t * comp,ucs_status_t status)241 uct_ud_iface_dispatch_comp(uct_ud_iface_t *iface, uct_completion_t *comp,
242                            ucs_status_t status)
243 {
244     /* Avoid reordering with pending queue - if we have any pending requests,
245      * prevent send operations from the completion callback
246      */
247     uct_ud_iface_raise_pending_async_ev(iface);
248     uct_invoke_completion(comp, status);
249 }
250 
251 static UCS_F_ALWAYS_INLINE void
uct_ud_iface_add_async_comp(uct_ud_iface_t * iface,uct_ud_send_skb_t * skb,ucs_status_t status)252 uct_ud_iface_add_async_comp(uct_ud_iface_t *iface, uct_ud_send_skb_t *skb,
253                             ucs_status_t status)
254 {
255     uct_ud_comp_desc_t *cdesc = uct_ud_comp_desc(skb);
256 
257     cdesc->status = status;
258     ucs_queue_push(&iface->tx.async_comp_q, &skb->queue);
259 }
260 
261