1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 * Copyright (c) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7 
8 #ifdef HAVE_CONFIG_H
9 #  include "config.h"
10 #endif
11 
12 #include "rc_ep.h"
13 #include "rc_iface.h"
14 
15 #include <uct/ib/base/ib_verbs.h>
16 #include <ucs/debug/memtrack.h>
17 #include <ucs/debug/log.h>
18 #include <ucs/type/class.h>
19 #include <endian.h>
20 
21 #ifdef ENABLE_STATS
22 static ucs_stats_class_t uct_rc_fc_stats_class = {
23     .name = "rc_fc",
24     .num_counters = UCT_RC_FC_STAT_LAST,
25     .counter_names = {
26         [UCT_RC_FC_STAT_NO_CRED]            = "no_cred",
27         [UCT_RC_FC_STAT_TX_GRANT]           = "tx_grant",
28         [UCT_RC_FC_STAT_TX_PURE_GRANT]      = "tx_pure_grant",
29         [UCT_RC_FC_STAT_TX_SOFT_REQ]        = "tx_soft_req",
30         [UCT_RC_FC_STAT_TX_HARD_REQ]        = "tx_hard_req",
31         [UCT_RC_FC_STAT_RX_GRANT]           = "rx_grant",
32         [UCT_RC_FC_STAT_RX_PURE_GRANT]      = "rx_pure_grant",
33         [UCT_RC_FC_STAT_RX_SOFT_REQ]        = "rx_soft_req",
34         [UCT_RC_FC_STAT_RX_HARD_REQ]        = "rx_hard_req",
35         [UCT_RC_FC_STAT_FC_WND]             = "fc_wnd"
36     }
37 };
38 
39 static ucs_stats_class_t uct_rc_txqp_stats_class = {
40     .name = "rc_txqp",
41     .num_counters = UCT_RC_TXQP_STAT_LAST,
42     .counter_names = {
43         [UCT_RC_TXQP_STAT_QP_FULL]          = "qp_full",
44         [UCT_RC_TXQP_STAT_SIGNAL]           = "signal"
45     }
46 };
47 #endif
48 
uct_rc_txqp_init(uct_rc_txqp_t * txqp,uct_rc_iface_t * iface,uint32_t qp_num UCS_STATS_ARG (ucs_stats_node_t * stats_parent))49 ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface,
50                               uint32_t qp_num
51                               UCS_STATS_ARG(ucs_stats_node_t* stats_parent))
52 {
53     txqp->unsignaled = 0;
54     txqp->unsignaled_store = 0;
55     txqp->unsignaled_store_count = 0;
56     txqp->available  = 0;
57     ucs_queue_head_init(&txqp->outstanding);
58 
59     return UCS_STATS_NODE_ALLOC(&txqp->stats, &uct_rc_txqp_stats_class,
60                                 stats_parent, "-0x%x", qp_num);
61 }
62 
uct_rc_txqp_cleanup(uct_rc_txqp_t * txqp)63 void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp)
64 {
65     uct_rc_txqp_purge_outstanding(txqp, UCS_ERR_CANCELED, 1);
66     UCS_STATS_NODE_FREE(txqp->stats);
67 }
68 
uct_rc_fc_init(uct_rc_fc_t * fc,int16_t winsize UCS_STATS_ARG (ucs_stats_node_t * stats_parent))69 ucs_status_t uct_rc_fc_init(uct_rc_fc_t *fc, int16_t winsize
70                             UCS_STATS_ARG(ucs_stats_node_t* stats_parent))
71 {
72     ucs_status_t status;
73 
74     fc->fc_wnd     = winsize;
75     fc->flags      = 0;
76 
77     status = UCS_STATS_NODE_ALLOC(&fc->stats, &uct_rc_fc_stats_class,
78                                   stats_parent);
79     if (status != UCS_OK) {
80        return status;
81     }
82 
83     UCS_STATS_SET_COUNTER(fc->stats, UCT_RC_FC_STAT_FC_WND, fc->fc_wnd);
84 
85     return UCS_OK;
86 }
87 
uct_rc_fc_cleanup(uct_rc_fc_t * fc)88 void uct_rc_fc_cleanup(uct_rc_fc_t *fc)
89 {
90     UCS_STATS_NODE_FREE(fc->stats);
91 }
92 
UCS_CLASS_INIT_FUNC(uct_rc_ep_t,uct_rc_iface_t * iface,uint32_t qp_num,const uct_ep_params_t * params)93 UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface, uint32_t qp_num,
94                     const uct_ep_params_t *params)
95 {
96     ucs_status_t status;
97 
98     UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);
99 
100     status = uct_rc_txqp_init(&self->txqp, iface, qp_num
101                               UCS_STATS_ARG(self->super.stats));
102     if (status != UCS_OK) {
103         return status;
104     }
105 
106     self->path_index = UCT_EP_PARAMS_GET_PATH_INDEX(params);
107 
108     status = uct_rc_fc_init(&self->fc, iface->config.fc_wnd_size
109                             UCS_STATS_ARG(self->super.stats));
110     if (status != UCS_OK) {
111         goto err_txqp_cleanup;
112     }
113 
114     /* Check that FC protocol fits AM id
115      * (just in case AM id space gets extended) */
116     UCS_STATIC_ASSERT(UCT_RC_EP_FC_MASK < UINT8_MAX);
117 
118     ucs_arbiter_group_init(&self->arb_group);
119 
120     ucs_list_add_head(&iface->ep_list, &self->list);
121     return UCS_OK;
122 
123 err_txqp_cleanup:
124     uct_rc_txqp_cleanup(&self->txqp);
125     return status;
126 }
127 
UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)128 static UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)
129 {
130     ucs_debug("destroy rc ep %p", self);
131 
132     ucs_list_del(&self->list);
133     uct_rc_ep_pending_purge(&self->super.super, NULL, NULL);
134     uct_rc_fc_cleanup(&self->fc);
135     uct_rc_txqp_cleanup(&self->txqp);
136 }
137 
UCS_CLASS_DEFINE(uct_rc_ep_t,uct_base_ep_t)138 UCS_CLASS_DEFINE(uct_rc_ep_t, uct_base_ep_t)
139 
140 void uct_rc_ep_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
141                            void *data, size_t length, size_t valid_length,
142                            char *buffer, size_t max)
143 {
144     uct_rc_hdr_t *rch = data;
145     uint8_t fc_hdr    = uct_rc_fc_get_fc_hdr(rch->am_id);
146     uint8_t am_wo_fc;
147 
148     /* Do not invoke AM tracer for auxiliary pure FC_GRANT message */
149     if (fc_hdr != UCT_RC_EP_FC_PURE_GRANT) {
150         am_wo_fc = rch->am_id & ~UCT_RC_EP_FC_MASK; /* mask out FC bits*/
151         snprintf(buffer, max, " %c%c am %d ",
152                  fc_hdr & UCT_RC_EP_FC_FLAG_SOFT_REQ ? 's' :
153                  fc_hdr & UCT_RC_EP_FC_FLAG_HARD_REQ ? 'h' : '-',
154                  fc_hdr & UCT_RC_EP_FC_FLAG_GRANT    ? 'g' : '-',
155                  am_wo_fc);
156         uct_iface_dump_am(iface, type, am_wo_fc, rch + 1, length - sizeof(*rch),
157                           buffer + strlen(buffer), max - strlen(buffer));
158     } else {
159         snprintf(buffer, max, " FC pure grant am ");
160     }
161 }
162 
163 static UCS_F_ALWAYS_INLINE void
uct_rc_op_release_iface_resources(uct_rc_iface_send_op_t * op,int is_get_zcopy)164 uct_rc_op_release_iface_resources(uct_rc_iface_send_op_t *op, int is_get_zcopy)
165 {
166     uct_rc_iface_send_desc_t *desc;
167     uct_rc_iface_t *iface;
168 
169     if (is_get_zcopy) {
170         op->iface->tx.reads_available += op->length;
171         return;
172     }
173 
174     desc  = ucs_derived_of(op, uct_rc_iface_send_desc_t);
175     iface = ucs_container_of(ucs_mpool_obj_owner(desc), uct_rc_iface_t, tx.mp);
176     iface->tx.reads_available += op->length;
177 }
178 
uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t * op,const void * resp)179 void uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
180 {
181     uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
182 
183     VALGRIND_MAKE_MEM_DEFINED(resp, desc->super.length);
184 
185     desc->unpack_cb(desc->super.unpack_arg, resp, desc->super.length);
186 
187     uct_rc_op_release_iface_resources(op, 0);
188     uct_invoke_completion(desc->super.user_comp, UCS_OK);
189     ucs_mpool_put(desc);
190 }
191 
uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t * op,const void * resp)192 void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op,
193                                                const void *resp)
194 {
195     uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
196 
197     VALGRIND_MAKE_MEM_DEFINED(resp, desc->super.length);
198 
199     desc->unpack_cb(desc->super.unpack_arg, resp, desc->super.length);
200     uct_rc_op_release_iface_resources(op, 0);
201     ucs_mpool_put(desc);
202 }
203 
uct_rc_ep_get_zcopy_completion_handler(uct_rc_iface_send_op_t * op,const void * resp)204 void uct_rc_ep_get_zcopy_completion_handler(uct_rc_iface_send_op_t *op,
205                                             const void *resp)
206 {
207     uct_rc_op_release_iface_resources(op, 1);
208     uct_rc_ep_send_op_completion_handler(op, resp);
209 }
210 
uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t * op,const void * resp)211 void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
212                                           const void *resp)
213 {
214     uct_invoke_completion(op->user_comp, UCS_OK);
215     uct_rc_iface_put_send_op(op);
216 }
217 
uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t * op,const void * resp)218 void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
219                                            const void *resp)
220 {
221     uct_invoke_completion(op->user_comp, UCS_OK);
222     ucs_mpool_put(op);
223 }
224 
uct_rc_ep_pending_add(uct_ep_h tl_ep,uct_pending_req_t * n,unsigned flags)225 ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
226                                    unsigned flags)
227 {
228     uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
229     uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
230 
231     if (uct_rc_ep_has_tx_resources(ep) &&
232         uct_rc_iface_has_tx_resources(iface)) {
233         return UCS_ERR_BUSY;
234     }
235 
236     UCS_STATIC_ASSERT(sizeof(uct_pending_req_priv_arb_t) <=
237                       UCT_PENDING_REQ_PRIV_LEN);
238     uct_pending_req_arb_group_push(&ep->arb_group, n);
239     UCT_TL_EP_STAT_PEND(&ep->super);
240 
241     if (uct_rc_ep_has_tx_resources(ep)) {
242         /* If we have ep (but not iface) resources, we need to schedule the ep */
243         ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
244     }
245 
246     return UCS_OK;
247 }
248 
uct_rc_ep_process_pending(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)249 ucs_arbiter_cb_result_t uct_rc_ep_process_pending(ucs_arbiter_t *arbiter,
250                                                   ucs_arbiter_group_t *group,
251                                                   ucs_arbiter_elem_t *elem,
252                                                   void *arg)
253 {
254     uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
255     uct_rc_iface_t *iface UCS_V_UNUSED;
256     ucs_status_t status;
257     uct_rc_ep_t *ep;
258 
259     ucs_trace_data("progressing pending request %p", req);
260     status = req->func(req);
261     ucs_trace_data("status returned from progress pending: %s",
262                    ucs_status_string(status));
263 
264     if (status == UCS_OK) {
265         return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
266     } else if (status == UCS_INPROGRESS) {
267         return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
268     } else {
269         ep    = ucs_container_of(group, uct_rc_ep_t, arb_group);
270         iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);
271         if (!uct_rc_iface_has_tx_resources(iface)) {
272             /* No iface resources */
273             return UCS_ARBITER_CB_RESULT_STOP;
274         } else {
275             /* No ep resources */
276             ucs_assertv(!uct_rc_ep_has_tx_resources(ep),
277                         "pending callback returned error but send resources are available");
278             return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
279         }
280     }
281 }
282 
uct_rc_ep_abriter_purge_cb(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)283 static ucs_arbiter_cb_result_t uct_rc_ep_abriter_purge_cb(ucs_arbiter_t *arbiter,
284                                                           ucs_arbiter_group_t *group,
285                                                           ucs_arbiter_elem_t *elem,
286                                                           void *arg)
287 {
288     uct_purge_cb_args_t *cb_args    = arg;
289     uct_pending_purge_callback_t cb = cb_args->cb;
290     uct_pending_req_t *req          = ucs_container_of(elem, uct_pending_req_t,
291                                                        priv);
292     uct_rc_ep_t UCS_V_UNUSED *ep    = ucs_container_of(group, uct_rc_ep_t,
293                                                        arb_group);
294     uct_rc_fc_request_t *freq;
295 
296     /* Invoke user's callback only if it is not internal FC message */
297     if (ucs_likely(req->func != uct_rc_ep_fc_grant)){
298         if (cb != NULL) {
299             cb(req, cb_args->arg);
300         } else {
301             ucs_debug("ep=%p cancelling user pending request %p", ep, req);
302         }
303     } else {
304         freq = ucs_derived_of(req, uct_rc_fc_request_t);
305         ucs_mpool_put(freq);
306     }
307     return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
308 }
309 
uct_rc_ep_pending_purge(uct_ep_h tl_ep,uct_pending_purge_callback_t cb,void * arg)310 void uct_rc_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
311                              void *arg)
312 {
313     uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
314     uct_rc_ep_t *ep       = ucs_derived_of(tl_ep, uct_rc_ep_t);
315     uct_purge_cb_args_t  args = {cb, arg};
316 
317     ucs_arbiter_group_purge(&iface->tx.arbiter, &ep->arb_group,
318                             uct_rc_ep_abriter_purge_cb, &args);
319 }
320 
uct_rc_ep_fc_grant(uct_pending_req_t * self)321 ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self)
322 {
323     ucs_status_t status;
324     uct_rc_fc_request_t *freq = ucs_derived_of(self, uct_rc_fc_request_t);
325     uct_rc_ep_t *ep           = ucs_derived_of(freq->ep, uct_rc_ep_t);
326     uct_rc_iface_t *iface     = ucs_derived_of(ep->super.super.iface,
327                                                uct_rc_iface_t);
328 
329     ucs_assert_always(iface->config.fc_enabled);
330     status = uct_rc_fc_ctrl(&ep->super.super, UCT_RC_EP_FC_PURE_GRANT, NULL);
331     if (status == UCS_OK) {
332         UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_TX_PURE_GRANT, 1);
333         ucs_mpool_put(freq);
334     }
335     return status;
336 }
337 
uct_rc_txqp_purge_outstanding(uct_rc_txqp_t * txqp,ucs_status_t status,int is_log)338 void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
339                                    int is_log)
340 {
341     uct_rc_iface_send_op_t *op;
342     uct_rc_iface_send_desc_t *desc;
343 
344     ucs_queue_for_each_extract(op, &txqp->outstanding, queue, 1) {
345         if (op->handler != (uct_rc_send_handler_t)ucs_mpool_put) {
346             if (is_log != 0) {
347                 ucs_warn("destroying rc ep %p with uncompleted operation %p",
348                          txqp, op);
349             }
350 
351             if (op->user_comp != NULL) {
352                 /* This must be uct_rc_ep_get_bcopy_handler,
353                  * uct_rc_ep_get_bcopy_handler_no_completion,
354                  * uct_rc_ep_get_zcopy_completion_handler,
355                  * uct_rc_ep_flush_op_completion_handler or
356                  * one of the atomic handlers,
357                  * so invoke user completion */
358                 uct_invoke_completion(op->user_comp, status);
359             }
360 
361             /* Need to release rdma_read resources taken by get operations  */
362             if ((op->handler == uct_rc_ep_get_bcopy_handler) ||
363                 (op->handler == uct_rc_ep_get_bcopy_handler_no_completion)) {
364                 uct_rc_op_release_iface_resources(op, 0);
365             } else if (op->handler == uct_rc_ep_get_zcopy_completion_handler) {
366                 uct_rc_op_release_iface_resources(op, 1);
367             }
368         }
369         op->flags &= ~(UCT_RC_IFACE_SEND_OP_FLAG_INUSE |
370                        UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY);
371         if ((op->handler == uct_rc_ep_send_op_completion_handler) ||
372             (op->handler == uct_rc_ep_get_zcopy_completion_handler)) {
373             uct_rc_iface_put_send_op(op);
374         } else if (op->handler == uct_rc_ep_flush_op_completion_handler) {
375             ucs_mpool_put(op);
376         } else {
377             desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
378             ucs_mpool_put(desc);
379         }
380     }
381 }
382 
uct_rc_ep_flush(uct_rc_ep_t * ep,int16_t max_available,unsigned flags)383 ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available,
384                              unsigned flags)
385 {
386     uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface,
387                                            uct_rc_iface_t);
388 
389     if (!uct_rc_iface_has_tx_resources(iface) ||
390         !uct_rc_ep_has_tx_resources(ep)) {
391         return UCS_ERR_NO_RESOURCE;
392     }
393 
394     if (uct_rc_txqp_available(&ep->txqp) == max_available) {
395         UCT_TL_EP_STAT_FLUSH(&ep->super);
396         return UCS_OK;
397     }
398 
399     return UCS_INPROGRESS;
400 }
401 
uct_rc_ep_check_cqe(uct_rc_iface_t * iface,uct_rc_ep_t * ep)402 ucs_status_t uct_rc_ep_check_cqe(uct_rc_iface_t *iface, uct_rc_ep_t *ep)
403 {
404     uct_rc_txqp_t *txqp;
405 
406     if (!uct_rc_iface_have_tx_cqe_avail(iface)) {
407         UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_RC_IFACE_STAT_NO_CQE, 1);
408         UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
409         return UCS_ERR_NO_RESOURCE;
410     }
411 
412     txqp = &ep->txqp;
413     /* if unsignaled == RC_UNSIGNALED_INF this value was already saved and \
414        next operation will be defenitly signaled */
415     if (txqp->unsignaled != RC_UNSIGNALED_INF) {
416         txqp->unsignaled_store_count++;
417         txqp->unsignaled_store += txqp->unsignaled;
418         txqp->unsignaled        = RC_UNSIGNALED_INF;
419     }
420 
421     return UCS_OK;
422 }
423 
424 #define UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(_num_bits, _is_be) \
425     void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(_num_bits, _is_be) \
426             (uct_rc_iface_send_op_t *op, const void *resp) \
427     { \
428         uct_rc_iface_send_desc_t *desc = \
429             ucs_derived_of(op, uct_rc_iface_send_desc_t); \
430         const uint##_num_bits##_t *value = resp; \
431         uint##_num_bits##_t *dest = desc->super.buffer; \
432         \
433         VALGRIND_MAKE_MEM_DEFINED(value, sizeof(*value)); \
434         if (_is_be && (_num_bits == 32)) { \
435             *dest = ntohl(*value); /* TODO swap in-place */ \
436         } else if (_is_be && (_num_bits == 64)) { \
437             *dest = be64toh(*value); \
438         } else { \
439             *dest = *value; \
440         } \
441         \
442         uct_invoke_completion(desc->super.user_comp, UCS_OK); \
443         ucs_mpool_put(desc); \
444   }
445 
446 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(32, 0);
447 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(32, 1);
448 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(64, 0);
449 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(64, 1);
450 
uct_rc_ep_am_zcopy_handler(uct_rc_iface_send_op_t * op,const void * resp)451 void uct_rc_ep_am_zcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
452 {
453     uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
454     uct_invoke_completion(desc->super.user_comp, UCS_OK);
455     ucs_mpool_put(desc);
456 }
457