1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 * Copyright (c) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7
8 #ifdef HAVE_CONFIG_H
9 # include "config.h"
10 #endif
11
12 #include "rc_ep.h"
13 #include "rc_iface.h"
14
15 #include <uct/ib/base/ib_verbs.h>
16 #include <ucs/debug/memtrack.h>
17 #include <ucs/debug/log.h>
18 #include <ucs/type/class.h>
19 #include <endian.h>
20
21 #ifdef ENABLE_STATS
22 static ucs_stats_class_t uct_rc_fc_stats_class = {
23 .name = "rc_fc",
24 .num_counters = UCT_RC_FC_STAT_LAST,
25 .counter_names = {
26 [UCT_RC_FC_STAT_NO_CRED] = "no_cred",
27 [UCT_RC_FC_STAT_TX_GRANT] = "tx_grant",
28 [UCT_RC_FC_STAT_TX_PURE_GRANT] = "tx_pure_grant",
29 [UCT_RC_FC_STAT_TX_SOFT_REQ] = "tx_soft_req",
30 [UCT_RC_FC_STAT_TX_HARD_REQ] = "tx_hard_req",
31 [UCT_RC_FC_STAT_RX_GRANT] = "rx_grant",
32 [UCT_RC_FC_STAT_RX_PURE_GRANT] = "rx_pure_grant",
33 [UCT_RC_FC_STAT_RX_SOFT_REQ] = "rx_soft_req",
34 [UCT_RC_FC_STAT_RX_HARD_REQ] = "rx_hard_req",
35 [UCT_RC_FC_STAT_FC_WND] = "fc_wnd"
36 }
37 };
38
39 static ucs_stats_class_t uct_rc_txqp_stats_class = {
40 .name = "rc_txqp",
41 .num_counters = UCT_RC_TXQP_STAT_LAST,
42 .counter_names = {
43 [UCT_RC_TXQP_STAT_QP_FULL] = "qp_full",
44 [UCT_RC_TXQP_STAT_SIGNAL] = "signal"
45 }
46 };
47 #endif
48
uct_rc_txqp_init(uct_rc_txqp_t * txqp,uct_rc_iface_t * iface,uint32_t qp_num UCS_STATS_ARG (ucs_stats_node_t * stats_parent))49 ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface,
50 uint32_t qp_num
51 UCS_STATS_ARG(ucs_stats_node_t* stats_parent))
52 {
53 txqp->unsignaled = 0;
54 txqp->unsignaled_store = 0;
55 txqp->unsignaled_store_count = 0;
56 txqp->available = 0;
57 ucs_queue_head_init(&txqp->outstanding);
58
59 return UCS_STATS_NODE_ALLOC(&txqp->stats, &uct_rc_txqp_stats_class,
60 stats_parent, "-0x%x", qp_num);
61 }
62
uct_rc_txqp_cleanup(uct_rc_txqp_t * txqp)63 void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp)
64 {
65 uct_rc_txqp_purge_outstanding(txqp, UCS_ERR_CANCELED, 1);
66 UCS_STATS_NODE_FREE(txqp->stats);
67 }
68
uct_rc_fc_init(uct_rc_fc_t * fc,int16_t winsize UCS_STATS_ARG (ucs_stats_node_t * stats_parent))69 ucs_status_t uct_rc_fc_init(uct_rc_fc_t *fc, int16_t winsize
70 UCS_STATS_ARG(ucs_stats_node_t* stats_parent))
71 {
72 ucs_status_t status;
73
74 fc->fc_wnd = winsize;
75 fc->flags = 0;
76
77 status = UCS_STATS_NODE_ALLOC(&fc->stats, &uct_rc_fc_stats_class,
78 stats_parent);
79 if (status != UCS_OK) {
80 return status;
81 }
82
83 UCS_STATS_SET_COUNTER(fc->stats, UCT_RC_FC_STAT_FC_WND, fc->fc_wnd);
84
85 return UCS_OK;
86 }
87
uct_rc_fc_cleanup(uct_rc_fc_t * fc)88 void uct_rc_fc_cleanup(uct_rc_fc_t *fc)
89 {
90 UCS_STATS_NODE_FREE(fc->stats);
91 }
92
UCS_CLASS_INIT_FUNC(uct_rc_ep_t,uct_rc_iface_t * iface,uint32_t qp_num,const uct_ep_params_t * params)93 UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface, uint32_t qp_num,
94 const uct_ep_params_t *params)
95 {
96 ucs_status_t status;
97
98 UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);
99
100 status = uct_rc_txqp_init(&self->txqp, iface, qp_num
101 UCS_STATS_ARG(self->super.stats));
102 if (status != UCS_OK) {
103 return status;
104 }
105
106 self->path_index = UCT_EP_PARAMS_GET_PATH_INDEX(params);
107
108 status = uct_rc_fc_init(&self->fc, iface->config.fc_wnd_size
109 UCS_STATS_ARG(self->super.stats));
110 if (status != UCS_OK) {
111 goto err_txqp_cleanup;
112 }
113
114 /* Check that FC protocol fits AM id
115 * (just in case AM id space gets extended) */
116 UCS_STATIC_ASSERT(UCT_RC_EP_FC_MASK < UINT8_MAX);
117
118 ucs_arbiter_group_init(&self->arb_group);
119
120 ucs_list_add_head(&iface->ep_list, &self->list);
121 return UCS_OK;
122
123 err_txqp_cleanup:
124 uct_rc_txqp_cleanup(&self->txqp);
125 return status;
126 }
127
UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)128 static UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)
129 {
130 ucs_debug("destroy rc ep %p", self);
131
132 ucs_list_del(&self->list);
133 uct_rc_ep_pending_purge(&self->super.super, NULL, NULL);
134 uct_rc_fc_cleanup(&self->fc);
135 uct_rc_txqp_cleanup(&self->txqp);
136 }
137
UCS_CLASS_DEFINE(uct_rc_ep_t,uct_base_ep_t)138 UCS_CLASS_DEFINE(uct_rc_ep_t, uct_base_ep_t)
139
140 void uct_rc_ep_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
141 void *data, size_t length, size_t valid_length,
142 char *buffer, size_t max)
143 {
144 uct_rc_hdr_t *rch = data;
145 uint8_t fc_hdr = uct_rc_fc_get_fc_hdr(rch->am_id);
146 uint8_t am_wo_fc;
147
148 /* Do not invoke AM tracer for auxiliary pure FC_GRANT message */
149 if (fc_hdr != UCT_RC_EP_FC_PURE_GRANT) {
150 am_wo_fc = rch->am_id & ~UCT_RC_EP_FC_MASK; /* mask out FC bits*/
151 snprintf(buffer, max, " %c%c am %d ",
152 fc_hdr & UCT_RC_EP_FC_FLAG_SOFT_REQ ? 's' :
153 fc_hdr & UCT_RC_EP_FC_FLAG_HARD_REQ ? 'h' : '-',
154 fc_hdr & UCT_RC_EP_FC_FLAG_GRANT ? 'g' : '-',
155 am_wo_fc);
156 uct_iface_dump_am(iface, type, am_wo_fc, rch + 1, length - sizeof(*rch),
157 buffer + strlen(buffer), max - strlen(buffer));
158 } else {
159 snprintf(buffer, max, " FC pure grant am ");
160 }
161 }
162
163 static UCS_F_ALWAYS_INLINE void
uct_rc_op_release_iface_resources(uct_rc_iface_send_op_t * op,int is_get_zcopy)164 uct_rc_op_release_iface_resources(uct_rc_iface_send_op_t *op, int is_get_zcopy)
165 {
166 uct_rc_iface_send_desc_t *desc;
167 uct_rc_iface_t *iface;
168
169 if (is_get_zcopy) {
170 op->iface->tx.reads_available += op->length;
171 return;
172 }
173
174 desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
175 iface = ucs_container_of(ucs_mpool_obj_owner(desc), uct_rc_iface_t, tx.mp);
176 iface->tx.reads_available += op->length;
177 }
178
uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t * op,const void * resp)179 void uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
180 {
181 uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
182
183 VALGRIND_MAKE_MEM_DEFINED(resp, desc->super.length);
184
185 desc->unpack_cb(desc->super.unpack_arg, resp, desc->super.length);
186
187 uct_rc_op_release_iface_resources(op, 0);
188 uct_invoke_completion(desc->super.user_comp, UCS_OK);
189 ucs_mpool_put(desc);
190 }
191
uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t * op,const void * resp)192 void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op,
193 const void *resp)
194 {
195 uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
196
197 VALGRIND_MAKE_MEM_DEFINED(resp, desc->super.length);
198
199 desc->unpack_cb(desc->super.unpack_arg, resp, desc->super.length);
200 uct_rc_op_release_iface_resources(op, 0);
201 ucs_mpool_put(desc);
202 }
203
uct_rc_ep_get_zcopy_completion_handler(uct_rc_iface_send_op_t * op,const void * resp)204 void uct_rc_ep_get_zcopy_completion_handler(uct_rc_iface_send_op_t *op,
205 const void *resp)
206 {
207 uct_rc_op_release_iface_resources(op, 1);
208 uct_rc_ep_send_op_completion_handler(op, resp);
209 }
210
uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t * op,const void * resp)211 void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
212 const void *resp)
213 {
214 uct_invoke_completion(op->user_comp, UCS_OK);
215 uct_rc_iface_put_send_op(op);
216 }
217
uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t * op,const void * resp)218 void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
219 const void *resp)
220 {
221 uct_invoke_completion(op->user_comp, UCS_OK);
222 ucs_mpool_put(op);
223 }
224
uct_rc_ep_pending_add(uct_ep_h tl_ep,uct_pending_req_t * n,unsigned flags)225 ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
226 unsigned flags)
227 {
228 uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
229 uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
230
231 if (uct_rc_ep_has_tx_resources(ep) &&
232 uct_rc_iface_has_tx_resources(iface)) {
233 return UCS_ERR_BUSY;
234 }
235
236 UCS_STATIC_ASSERT(sizeof(uct_pending_req_priv_arb_t) <=
237 UCT_PENDING_REQ_PRIV_LEN);
238 uct_pending_req_arb_group_push(&ep->arb_group, n);
239 UCT_TL_EP_STAT_PEND(&ep->super);
240
241 if (uct_rc_ep_has_tx_resources(ep)) {
242 /* If we have ep (but not iface) resources, we need to schedule the ep */
243 ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
244 }
245
246 return UCS_OK;
247 }
248
uct_rc_ep_process_pending(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)249 ucs_arbiter_cb_result_t uct_rc_ep_process_pending(ucs_arbiter_t *arbiter,
250 ucs_arbiter_group_t *group,
251 ucs_arbiter_elem_t *elem,
252 void *arg)
253 {
254 uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
255 uct_rc_iface_t *iface UCS_V_UNUSED;
256 ucs_status_t status;
257 uct_rc_ep_t *ep;
258
259 ucs_trace_data("progressing pending request %p", req);
260 status = req->func(req);
261 ucs_trace_data("status returned from progress pending: %s",
262 ucs_status_string(status));
263
264 if (status == UCS_OK) {
265 return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
266 } else if (status == UCS_INPROGRESS) {
267 return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
268 } else {
269 ep = ucs_container_of(group, uct_rc_ep_t, arb_group);
270 iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);
271 if (!uct_rc_iface_has_tx_resources(iface)) {
272 /* No iface resources */
273 return UCS_ARBITER_CB_RESULT_STOP;
274 } else {
275 /* No ep resources */
276 ucs_assertv(!uct_rc_ep_has_tx_resources(ep),
277 "pending callback returned error but send resources are available");
278 return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
279 }
280 }
281 }
282
uct_rc_ep_abriter_purge_cb(ucs_arbiter_t * arbiter,ucs_arbiter_group_t * group,ucs_arbiter_elem_t * elem,void * arg)283 static ucs_arbiter_cb_result_t uct_rc_ep_abriter_purge_cb(ucs_arbiter_t *arbiter,
284 ucs_arbiter_group_t *group,
285 ucs_arbiter_elem_t *elem,
286 void *arg)
287 {
288 uct_purge_cb_args_t *cb_args = arg;
289 uct_pending_purge_callback_t cb = cb_args->cb;
290 uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t,
291 priv);
292 uct_rc_ep_t UCS_V_UNUSED *ep = ucs_container_of(group, uct_rc_ep_t,
293 arb_group);
294 uct_rc_fc_request_t *freq;
295
296 /* Invoke user's callback only if it is not internal FC message */
297 if (ucs_likely(req->func != uct_rc_ep_fc_grant)){
298 if (cb != NULL) {
299 cb(req, cb_args->arg);
300 } else {
301 ucs_debug("ep=%p cancelling user pending request %p", ep, req);
302 }
303 } else {
304 freq = ucs_derived_of(req, uct_rc_fc_request_t);
305 ucs_mpool_put(freq);
306 }
307 return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
308 }
309
uct_rc_ep_pending_purge(uct_ep_h tl_ep,uct_pending_purge_callback_t cb,void * arg)310 void uct_rc_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
311 void *arg)
312 {
313 uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
314 uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
315 uct_purge_cb_args_t args = {cb, arg};
316
317 ucs_arbiter_group_purge(&iface->tx.arbiter, &ep->arb_group,
318 uct_rc_ep_abriter_purge_cb, &args);
319 }
320
uct_rc_ep_fc_grant(uct_pending_req_t * self)321 ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self)
322 {
323 ucs_status_t status;
324 uct_rc_fc_request_t *freq = ucs_derived_of(self, uct_rc_fc_request_t);
325 uct_rc_ep_t *ep = ucs_derived_of(freq->ep, uct_rc_ep_t);
326 uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface,
327 uct_rc_iface_t);
328
329 ucs_assert_always(iface->config.fc_enabled);
330 status = uct_rc_fc_ctrl(&ep->super.super, UCT_RC_EP_FC_PURE_GRANT, NULL);
331 if (status == UCS_OK) {
332 UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_TX_PURE_GRANT, 1);
333 ucs_mpool_put(freq);
334 }
335 return status;
336 }
337
uct_rc_txqp_purge_outstanding(uct_rc_txqp_t * txqp,ucs_status_t status,int is_log)338 void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
339 int is_log)
340 {
341 uct_rc_iface_send_op_t *op;
342 uct_rc_iface_send_desc_t *desc;
343
344 ucs_queue_for_each_extract(op, &txqp->outstanding, queue, 1) {
345 if (op->handler != (uct_rc_send_handler_t)ucs_mpool_put) {
346 if (is_log != 0) {
347 ucs_warn("destroying rc ep %p with uncompleted operation %p",
348 txqp, op);
349 }
350
351 if (op->user_comp != NULL) {
352 /* This must be uct_rc_ep_get_bcopy_handler,
353 * uct_rc_ep_get_bcopy_handler_no_completion,
354 * uct_rc_ep_get_zcopy_completion_handler,
355 * uct_rc_ep_flush_op_completion_handler or
356 * one of the atomic handlers,
357 * so invoke user completion */
358 uct_invoke_completion(op->user_comp, status);
359 }
360
361 /* Need to release rdma_read resources taken by get operations */
362 if ((op->handler == uct_rc_ep_get_bcopy_handler) ||
363 (op->handler == uct_rc_ep_get_bcopy_handler_no_completion)) {
364 uct_rc_op_release_iface_resources(op, 0);
365 } else if (op->handler == uct_rc_ep_get_zcopy_completion_handler) {
366 uct_rc_op_release_iface_resources(op, 1);
367 }
368 }
369 op->flags &= ~(UCT_RC_IFACE_SEND_OP_FLAG_INUSE |
370 UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY);
371 if ((op->handler == uct_rc_ep_send_op_completion_handler) ||
372 (op->handler == uct_rc_ep_get_zcopy_completion_handler)) {
373 uct_rc_iface_put_send_op(op);
374 } else if (op->handler == uct_rc_ep_flush_op_completion_handler) {
375 ucs_mpool_put(op);
376 } else {
377 desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
378 ucs_mpool_put(desc);
379 }
380 }
381 }
382
uct_rc_ep_flush(uct_rc_ep_t * ep,int16_t max_available,unsigned flags)383 ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available,
384 unsigned flags)
385 {
386 uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface,
387 uct_rc_iface_t);
388
389 if (!uct_rc_iface_has_tx_resources(iface) ||
390 !uct_rc_ep_has_tx_resources(ep)) {
391 return UCS_ERR_NO_RESOURCE;
392 }
393
394 if (uct_rc_txqp_available(&ep->txqp) == max_available) {
395 UCT_TL_EP_STAT_FLUSH(&ep->super);
396 return UCS_OK;
397 }
398
399 return UCS_INPROGRESS;
400 }
401
uct_rc_ep_check_cqe(uct_rc_iface_t * iface,uct_rc_ep_t * ep)402 ucs_status_t uct_rc_ep_check_cqe(uct_rc_iface_t *iface, uct_rc_ep_t *ep)
403 {
404 uct_rc_txqp_t *txqp;
405
406 if (!uct_rc_iface_have_tx_cqe_avail(iface)) {
407 UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_RC_IFACE_STAT_NO_CQE, 1);
408 UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
409 return UCS_ERR_NO_RESOURCE;
410 }
411
412 txqp = &ep->txqp;
413 /* if unsignaled == RC_UNSIGNALED_INF this value was already saved and \
414 next operation will be defenitly signaled */
415 if (txqp->unsignaled != RC_UNSIGNALED_INF) {
416 txqp->unsignaled_store_count++;
417 txqp->unsignaled_store += txqp->unsignaled;
418 txqp->unsignaled = RC_UNSIGNALED_INF;
419 }
420
421 return UCS_OK;
422 }
423
424 #define UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(_num_bits, _is_be) \
425 void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(_num_bits, _is_be) \
426 (uct_rc_iface_send_op_t *op, const void *resp) \
427 { \
428 uct_rc_iface_send_desc_t *desc = \
429 ucs_derived_of(op, uct_rc_iface_send_desc_t); \
430 const uint##_num_bits##_t *value = resp; \
431 uint##_num_bits##_t *dest = desc->super.buffer; \
432 \
433 VALGRIND_MAKE_MEM_DEFINED(value, sizeof(*value)); \
434 if (_is_be && (_num_bits == 32)) { \
435 *dest = ntohl(*value); /* TODO swap in-place */ \
436 } else if (_is_be && (_num_bits == 64)) { \
437 *dest = be64toh(*value); \
438 } else { \
439 *dest = *value; \
440 } \
441 \
442 uct_invoke_completion(desc->super.user_comp, UCS_OK); \
443 ucs_mpool_put(desc); \
444 }
445
446 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(32, 0);
447 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(32, 1);
448 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(64, 0);
449 UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(64, 1);
450
uct_rc_ep_am_zcopy_handler(uct_rc_iface_send_op_t * op,const void * resp)451 void uct_rc_ep_am_zcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
452 {
453 uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
454 uct_invoke_completion(desc->super.user_comp, UCS_OK);
455 ucs_mpool_put(desc);
456 }
457