1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "ucp_context.h"
12 #include "ucp_worker.h"
13 #include "ucp_request.inl"
14 
15 #include <ucp/proto/proto_am.h>
16 
17 #include <ucs/datastruct/mpool.inl>
18 #include <ucs/debug/debug.h>
19 #include <ucs/debug/log.h>
20 
21 
22 const ucp_request_param_t ucp_request_null_param = { .op_attr_mask = 0 };
23 
24 
ucp_request_is_completed(void * request)25 int ucp_request_is_completed(void *request)
26 {
27     ucp_request_t *req = (ucp_request_t*)request - 1;
28     return !!(req->flags & UCP_REQUEST_FLAG_COMPLETED);
29 }
30 
ucp_request_check_status(void * request)31 ucs_status_t ucp_request_check_status(void *request)
32 {
33     ucp_request_t *req = (ucp_request_t*)request - 1;
34 
35     if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
36         ucs_assert(req->status != UCS_INPROGRESS);
37         return req->status;
38     }
39     return UCS_INPROGRESS;
40 }
41 
ucp_tag_recv_request_test(void * request,ucp_tag_recv_info_t * info)42 ucs_status_t ucp_tag_recv_request_test(void *request, ucp_tag_recv_info_t *info)
43 {
44     ucp_request_t *req   = (ucp_request_t*)request - 1;
45     ucs_status_t  status = ucp_request_check_status(request);
46 
47     if (status != UCS_INPROGRESS) {
48         ucs_assert(req->flags & UCP_REQUEST_FLAG_RECV);
49         *info = req->recv.tag.info;
50     }
51 
52     return status;
53 }
54 
ucp_stream_recv_request_test(void * request,size_t * length_p)55 ucs_status_t ucp_stream_recv_request_test(void *request, size_t *length_p)
56 {
57     ucp_request_t *req   = (ucp_request_t*)request - 1;
58     ucs_status_t  status = ucp_request_check_status(request);
59 
60     if (status != UCS_INPROGRESS) {
61         ucs_assert(req->flags & UCP_REQUEST_FLAG_STREAM_RECV);
62         *length_p = req->recv.stream.length;
63     }
64 
65     return status;
66 }
67 
68 static UCS_F_ALWAYS_INLINE void
ucp_request_release_common(void * request,uint8_t cb_flag,const char * debug_name)69 ucp_request_release_common(void *request, uint8_t cb_flag, const char *debug_name)
70 {
71     ucp_request_t *req = (ucp_request_t*)request - 1;
72     ucp_worker_h UCS_V_UNUSED worker = ucs_container_of(ucs_mpool_obj_owner(req),
73                                                         ucp_worker_t, req_mp);
74     uint32_t flags;
75 
76     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
77 
78     flags = req->flags;
79     ucs_trace_req("%s request %p (%p) "UCP_REQUEST_FLAGS_FMT, debug_name,
80                   req, req + 1, UCP_REQUEST_FLAGS_ARG(flags));
81 
82     ucs_assert(!(flags & UCP_REQUEST_DEBUG_FLAG_EXTERNAL));
83     ucs_assert(!(flags & UCP_REQUEST_FLAG_RELEASED));
84 
85     if (ucs_likely(flags & UCP_REQUEST_FLAG_COMPLETED)) {
86         ucp_request_put(req);
87     } else {
88         req->flags = (flags | UCP_REQUEST_FLAG_RELEASED) & ~cb_flag;
89     }
90 
91     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
92 }
93 
94 UCS_PROFILE_FUNC_VOID(ucp_request_release, (request), void *request)
95 {
96     /* mark request as released */
97     ucp_request_release_common(request, 0, "release");
98 }
99 
100 UCS_PROFILE_FUNC_VOID(ucp_request_free, (request), void *request)
101 {
102     /* mark request as released and disable the callback */
103     ucp_request_release_common(request, UCP_REQUEST_FLAG_CALLBACK, "free");
104 }
105 
106 UCS_PROFILE_FUNC(void*, ucp_request_alloc,
107                  (worker),
108                  ucp_worker_h worker)
109 {
110     return NULL;
111 }
112 
113 UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request),
114                       ucp_worker_h worker, void *request)
115 {
116     ucp_request_t *req = (ucp_request_t*)request - 1;
117     int removed;
118 
119     if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
120         return;
121     }
122 
123     if (req->flags & UCP_REQUEST_FLAG_EXPECTED) {
124         UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
125 
126         removed = ucp_tag_exp_remove(&worker->tm, req);
127         /* If tag posted to the transport need to wait its completion */
128         if (removed && !(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
129             ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED);
130         }
131 
132         UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
133     }
134 }
135 
ucp_worker_request_init_proxy(ucs_mpool_t * mp,void * obj,void * chunk)136 static void ucp_worker_request_init_proxy(ucs_mpool_t *mp, void *obj, void *chunk)
137 {
138     ucp_worker_h worker = ucs_container_of(mp, ucp_worker_t, req_mp);
139     ucp_context_h context = worker->context;
140     ucp_request_t *req = obj;
141 
142     if (context->config.request.init != NULL) {
143         context->config.request.init(req + 1);
144     }
145 }
146 
ucp_worker_request_fini_proxy(ucs_mpool_t * mp,void * obj)147 static void ucp_worker_request_fini_proxy(ucs_mpool_t *mp, void *obj)
148 {
149     ucp_worker_h worker = ucs_container_of(mp, ucp_worker_t, req_mp);
150     ucp_context_h context = worker->context;
151     ucp_request_t *req = obj;
152 
153     if (context->config.request.cleanup != NULL) {
154         context->config.request.cleanup(req + 1);
155     }
156 }
157 
158 ucs_mpool_ops_t ucp_request_mpool_ops = {
159     .chunk_alloc   = ucs_mpool_hugetlb_malloc,
160     .chunk_release = ucs_mpool_hugetlb_free,
161     .obj_init      = ucp_worker_request_init_proxy,
162     .obj_cleanup   = ucp_worker_request_fini_proxy
163 };
164 
165 ucs_mpool_ops_t ucp_rndv_get_mpool_ops = {
166     .chunk_alloc   = ucs_mpool_chunk_malloc,
167     .chunk_release = ucs_mpool_chunk_free,
168     .obj_init      = NULL,
169     .obj_cleanup   = NULL
170 };
171 
ucp_request_pending_add(ucp_request_t * req,ucs_status_t * req_status,unsigned pending_flags)172 int ucp_request_pending_add(ucp_request_t *req, ucs_status_t *req_status,
173                             unsigned pending_flags)
174 {
175     ucs_status_t status;
176     uct_ep_h uct_ep;
177 
178     ucs_assertv(req->send.lane != UCP_NULL_LANE, "%s() did not set req->send.lane",
179                 ucs_debug_get_symbol_name(req->send.uct.func));
180 
181     uct_ep = req->send.ep->uct_eps[req->send.lane];
182     status = uct_ep_pending_add(uct_ep, &req->send.uct, pending_flags);
183     if (status == UCS_OK) {
184         ucs_trace_data("ep %p: added pending uct request %p to lane[%d]=%p",
185                        req->send.ep, req, req->send.lane, uct_ep);
186         *req_status            = UCS_INPROGRESS;
187         req->send.pending_lane = req->send.lane;
188         return 1;
189     } else if (status == UCS_ERR_BUSY) {
190         /* Could not add, try to send again */
191         return 0;
192     }
193 
194     /* Unexpected error while adding to pending */
195     ucs_fatal("invalid return status from uct_ep_pending_add(): %s",
196               ucs_status_string(status));
197 }
198 
ucp_request_dt_dereg(ucp_context_t * context,ucp_dt_reg_t * dt_reg,size_t count,ucp_request_t * req_dbg)199 static void ucp_request_dt_dereg(ucp_context_t *context, ucp_dt_reg_t *dt_reg,
200                                  size_t count, ucp_request_t *req_dbg)
201 {
202     size_t i;
203 
204     for (i = 0; i < count; ++i) {
205         ucp_trace_req(req_dbg, "mem dereg buffer %ld/%ld md_map 0x%"PRIx64,
206                       i, count, dt_reg[i].md_map);
207         ucp_mem_rereg_mds(context, 0, NULL, 0, 0, NULL, UCS_MEMORY_TYPE_HOST, NULL,
208                           dt_reg[i].memh, &dt_reg[i].md_map);
209         ucs_assert(dt_reg[i].md_map == 0);
210     }
211 }
212 
213 UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
214                  (context, md_map, buffer, length, datatype, state, mem_type, req_dbg, uct_flags),
215                  ucp_context_t *context, ucp_md_map_t md_map, void *buffer,
216                  size_t length, ucp_datatype_t datatype, ucp_dt_state_t *state,
217                  ucs_memory_type_t mem_type, ucp_request_t *req_dbg, unsigned uct_flags)
218 {
219     size_t iov_it, iovcnt;
220     const ucp_dt_iov_t *iov;
221     ucp_dt_reg_t *dt_reg;
222     ucs_status_t status;
223     int flags;
224     int level;
225 
226     ucs_trace_func("context=%p md_map=0x%lx buffer=%p length=%zu datatype=0x%lu "
227                    "state=%p", context, md_map, buffer, length, datatype, state);
228 
229     status = UCS_OK;
230     flags  = UCT_MD_MEM_ACCESS_RMA | uct_flags;
231     switch (datatype & UCP_DATATYPE_CLASS_MASK) {
232     case UCP_DATATYPE_CONTIG:
233         ucs_assert(ucs_popcount(md_map) <= UCP_MAX_OP_MDS);
234         status = ucp_mem_rereg_mds(context, md_map, buffer, length, flags,
235                                    NULL, mem_type, NULL, state->dt.contig.memh,
236                                    &state->dt.contig.md_map);
237         ucp_trace_req(req_dbg, "mem reg md_map 0x%"PRIx64"/0x%"PRIx64,
238                       state->dt.contig.md_map, md_map);
239         break;
240     case UCP_DATATYPE_IOV:
241         iovcnt = state->dt.iov.iovcnt;
242         iov    = buffer;
243         dt_reg = ((state->dt.iov.dt_reg == NULL) ?
244                   ucs_calloc(iovcnt, sizeof(*dt_reg), "iov_dt_reg") :
245                   state->dt.iov.dt_reg);
246         if (NULL == dt_reg) {
247             status = UCS_ERR_NO_MEMORY;
248             goto err;
249         }
250         for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
251             if (iov[iov_it].length) {
252                 status = ucp_mem_rereg_mds(context, md_map, iov[iov_it].buffer,
253                                            iov[iov_it].length, flags, NULL,
254                                            mem_type, NULL, dt_reg[iov_it].memh,
255                                            &dt_reg[iov_it].md_map);
256                 if (status != UCS_OK) {
257                     /* unregister previously registered memory */
258                     ucp_request_dt_dereg(context, dt_reg, iov_it, req_dbg);
259                     ucs_free(dt_reg);
260                     goto err;
261                 }
262                 ucp_trace_req(req_dbg,
263                               "mem reg iov %ld/%ld md_map 0x%"PRIx64"/0x%"PRIx64,
264                               iov_it, iovcnt, dt_reg[iov_it].md_map, md_map);
265             }
266         }
267         state->dt.iov.dt_reg = dt_reg;
268         break;
269     default:
270         status = UCS_ERR_INVALID_PARAM;
271         ucs_error("Invalid data type %lx", datatype);
272     }
273 
274 err:
275     if (status != UCS_OK) {
276         level = (flags & UCT_MD_MEM_FLAG_HIDE_ERRORS) ?
277                 UCS_LOG_LEVEL_DEBUG : UCS_LOG_LEVEL_ERROR;
278         ucs_log(level,
279                 "failed to register user buffer datatype 0x%lx address %p len %zu:"
280                 " %s", datatype, buffer, length, ucs_status_string(status));
281     }
282     return status;
283 }
284 
285 UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg, (context, datatype, state, req_dbg),
286                       ucp_context_t *context, ucp_datatype_t datatype,
287                       ucp_dt_state_t *state, ucp_request_t *req_dbg)
288 {
289     ucs_trace_func("context=%p datatype=0x%lu state=%p", context, datatype,
290                    state);
291 
292     switch (datatype & UCP_DATATYPE_CLASS_MASK) {
293     case UCP_DATATYPE_CONTIG:
294         ucp_request_dt_dereg(context, &state->dt.contig, 1, req_dbg);
295         break;
296     case UCP_DATATYPE_IOV:
297         if (state->dt.iov.dt_reg != NULL) {
298             ucp_request_dt_dereg(context, state->dt.iov.dt_reg,
299                                  state->dt.iov.iovcnt, req_dbg);
300             ucs_free(state->dt.iov.dt_reg);
301             state->dt.iov.dt_reg = NULL;
302         }
303         break;
304     default:
305         break;
306     }
307 }
308 
309 /* NOTE: deprecated */
ucp_request_test(void * request,ucp_tag_recv_info_t * info)310 ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info)
311 {
312     ucp_request_t *req = (ucp_request_t*)request - 1;
313 
314     if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
315         if (req->flags & UCP_REQUEST_FLAG_RECV) {
316             *info = req->recv.tag.info;
317         }
318         ucs_assert(req->status != UCS_INPROGRESS);
319         return req->status;
320     }
321     return UCS_INPROGRESS;
322 }
323 
324 static UCS_F_ALWAYS_INLINE
ucp_request_init_multi_proto(ucp_request_t * req,uct_pending_callback_t multi_func,const char * multi_func_str)325 void ucp_request_init_multi_proto(ucp_request_t *req,
326                                   uct_pending_callback_t multi_func,
327                                   const char *multi_func_str)
328 {
329     req->send.uct.func = multi_func;
330 
331     if (req->flags & (UCP_REQUEST_FLAG_SEND_TAG |
332                       UCP_REQUEST_FLAG_SEND_AM)) {
333         req->send.msg_proto.message_id  = req->send.ep->worker->am_message_id++;
334         req->send.msg_proto.am_bw_index = 0;
335     }
336 
337     req->send.pending_lane = UCP_NULL_LANE;
338     UCS_PROFILE_REQUEST_EVENT(req, multi_func_str, req->send.length);
339 }
340 
341 ucs_status_t
ucp_request_send_start(ucp_request_t * req,ssize_t max_short,size_t zcopy_thresh,size_t zcopy_max,size_t dt_count,const ucp_ep_msg_config_t * msg_config,const ucp_request_send_proto_t * proto)342 ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
343                        size_t zcopy_thresh, size_t zcopy_max, size_t dt_count,
344                        const ucp_ep_msg_config_t* msg_config,
345                        const ucp_request_send_proto_t *proto)
346 {
347     size_t       length = req->send.length;
348     ucs_status_t status;
349     int          multi;
350 
351     if ((ssize_t)length <= max_short) {
352         /* short */
353         req->send.uct.func = proto->contig_short;
354         UCS_PROFILE_REQUEST_EVENT(req, "start_contig_short", req->send.length);
355         return UCS_OK;
356     } else if (length < zcopy_thresh) {
357         /* bcopy */
358         ucp_request_send_state_reset(req, NULL, UCP_REQUEST_SEND_PROTO_BCOPY_AM);
359         ucs_assert(msg_config->max_bcopy >= proto->only_hdr_size);
360         if (length <= (msg_config->max_bcopy - proto->only_hdr_size)) {
361             req->send.uct.func = proto->bcopy_single;
362             UCS_PROFILE_REQUEST_EVENT(req, "start_bcopy_single", req->send.length);
363         } else {
364             ucp_request_init_multi_proto(req, proto->bcopy_multi,
365                                          "start_bcopy_multi");
366         }
367         return UCS_OK;
368     } else if (length < zcopy_max) {
369         /* zcopy */
370         ucp_request_send_state_reset(req, proto->zcopy_completion,
371                                      UCP_REQUEST_SEND_PROTO_ZCOPY_AM);
372         status = ucp_request_send_buffer_reg_lane(req, req->send.lane, 0);
373         if (status != UCS_OK) {
374             return status;
375         }
376 
377         if (ucs_unlikely(length > msg_config->max_zcopy - proto->only_hdr_size)) {
378             multi = 1;
379         } else if (ucs_unlikely(UCP_DT_IS_IOV(req->send.datatype))) {
380             if (dt_count <= msg_config->max_iov) {
381                 multi = 0;
382             } else {
383                 multi = ucp_dt_iov_count_nonempty(req->send.buffer, dt_count) >
384                         msg_config->max_iov;
385             }
386         } else {
387             multi = 0;
388         }
389 
390         if (multi) {
391             ucp_request_init_multi_proto(req, proto->zcopy_multi,
392                                          "start_zcopy_multi");
393         } else {
394             req->send.uct.func = proto->zcopy_single;
395             UCS_PROFILE_REQUEST_EVENT(req, "start_zcopy_single", req->send.length);
396         }
397         return UCS_OK;
398     }
399 
400     return UCS_ERR_NO_PROGRESS;
401 }
402 
ucp_request_send_state_ff(ucp_request_t * req,ucs_status_t status)403 void ucp_request_send_state_ff(ucp_request_t *req, ucs_status_t status)
404 {
405     /*
406      * FIXME should not fast-forward requests owned by UCT
407      */
408     ucp_trace_req(req, "fast-forward with status %s", ucs_status_string(status));
409 
410     if (req->send.state.uct_comp.func == ucp_ep_flush_completion) {
411         ucp_ep_flush_request_ff(req, status);
412     } else if (req->send.state.uct_comp.func) {
413         req->send.state.dt.offset = req->send.length;
414         req->send.state.uct_comp.count = 0;
415         req->send.state.uct_comp.func(&req->send.state.uct_comp, status);
416     } else {
417         ucp_request_complete_send(req, status);
418     }
419 }
420 
ucp_request_recv_msg_truncated(ucp_request_t * req,size_t length,size_t offset)421 ucs_status_t ucp_request_recv_msg_truncated(ucp_request_t *req, size_t length,
422                                             size_t offset)
423 {
424     ucp_dt_generic_t *dt_gen;
425 
426     ucs_debug("message truncated: recv_length %zu offset %zu buffer_size %zu",
427               length, offset, req->recv.length);
428 
429     if (UCP_DT_IS_GENERIC(req->recv.datatype)) {
430         dt_gen = ucp_dt_generic(req->recv.datatype);
431         UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
432                                     req->recv.state.dt.generic.state);
433     }
434 
435     return UCS_ERR_MESSAGE_TRUNCATED;
436 }
437 
438 
439