1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #ifdef HAVE_CONFIG_H
8 # include "config.h"
9 #endif
10
11 #include "ucp_context.h"
12 #include "ucp_worker.h"
13 #include "ucp_request.inl"
14
15 #include <ucp/proto/proto_am.h>
16
17 #include <ucs/datastruct/mpool.inl>
18 #include <ucs/debug/debug.h>
19 #include <ucs/debug/log.h>
20
21
22 const ucp_request_param_t ucp_request_null_param = { .op_attr_mask = 0 };
23
24
ucp_request_is_completed(void * request)25 int ucp_request_is_completed(void *request)
26 {
27 ucp_request_t *req = (ucp_request_t*)request - 1;
28 return !!(req->flags & UCP_REQUEST_FLAG_COMPLETED);
29 }
30
ucp_request_check_status(void * request)31 ucs_status_t ucp_request_check_status(void *request)
32 {
33 ucp_request_t *req = (ucp_request_t*)request - 1;
34
35 if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
36 ucs_assert(req->status != UCS_INPROGRESS);
37 return req->status;
38 }
39 return UCS_INPROGRESS;
40 }
41
ucp_tag_recv_request_test(void * request,ucp_tag_recv_info_t * info)42 ucs_status_t ucp_tag_recv_request_test(void *request, ucp_tag_recv_info_t *info)
43 {
44 ucp_request_t *req = (ucp_request_t*)request - 1;
45 ucs_status_t status = ucp_request_check_status(request);
46
47 if (status != UCS_INPROGRESS) {
48 ucs_assert(req->flags & UCP_REQUEST_FLAG_RECV);
49 *info = req->recv.tag.info;
50 }
51
52 return status;
53 }
54
ucp_stream_recv_request_test(void * request,size_t * length_p)55 ucs_status_t ucp_stream_recv_request_test(void *request, size_t *length_p)
56 {
57 ucp_request_t *req = (ucp_request_t*)request - 1;
58 ucs_status_t status = ucp_request_check_status(request);
59
60 if (status != UCS_INPROGRESS) {
61 ucs_assert(req->flags & UCP_REQUEST_FLAG_STREAM_RECV);
62 *length_p = req->recv.stream.length;
63 }
64
65 return status;
66 }
67
68 static UCS_F_ALWAYS_INLINE void
ucp_request_release_common(void * request,uint8_t cb_flag,const char * debug_name)69 ucp_request_release_common(void *request, uint8_t cb_flag, const char *debug_name)
70 {
71 ucp_request_t *req = (ucp_request_t*)request - 1;
72 ucp_worker_h UCS_V_UNUSED worker = ucs_container_of(ucs_mpool_obj_owner(req),
73 ucp_worker_t, req_mp);
74 uint32_t flags;
75
76 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
77
78 flags = req->flags;
79 ucs_trace_req("%s request %p (%p) "UCP_REQUEST_FLAGS_FMT, debug_name,
80 req, req + 1, UCP_REQUEST_FLAGS_ARG(flags));
81
82 ucs_assert(!(flags & UCP_REQUEST_DEBUG_FLAG_EXTERNAL));
83 ucs_assert(!(flags & UCP_REQUEST_FLAG_RELEASED));
84
85 if (ucs_likely(flags & UCP_REQUEST_FLAG_COMPLETED)) {
86 ucp_request_put(req);
87 } else {
88 req->flags = (flags | UCP_REQUEST_FLAG_RELEASED) & ~cb_flag;
89 }
90
91 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
92 }
93
94 UCS_PROFILE_FUNC_VOID(ucp_request_release, (request), void *request)
95 {
96 /* mark request as released */
97 ucp_request_release_common(request, 0, "release");
98 }
99
100 UCS_PROFILE_FUNC_VOID(ucp_request_free, (request), void *request)
101 {
102 /* mark request as released and disable the callback */
103 ucp_request_release_common(request, UCP_REQUEST_FLAG_CALLBACK, "free");
104 }
105
106 UCS_PROFILE_FUNC(void*, ucp_request_alloc,
107 (worker),
108 ucp_worker_h worker)
109 {
110 return NULL;
111 }
112
113 UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request),
114 ucp_worker_h worker, void *request)
115 {
116 ucp_request_t *req = (ucp_request_t*)request - 1;
117 int removed;
118
119 if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
120 return;
121 }
122
123 if (req->flags & UCP_REQUEST_FLAG_EXPECTED) {
124 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
125
126 removed = ucp_tag_exp_remove(&worker->tm, req);
127 /* If tag posted to the transport need to wait its completion */
128 if (removed && !(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
129 ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED);
130 }
131
132 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
133 }
134 }
135
ucp_worker_request_init_proxy(ucs_mpool_t * mp,void * obj,void * chunk)136 static void ucp_worker_request_init_proxy(ucs_mpool_t *mp, void *obj, void *chunk)
137 {
138 ucp_worker_h worker = ucs_container_of(mp, ucp_worker_t, req_mp);
139 ucp_context_h context = worker->context;
140 ucp_request_t *req = obj;
141
142 if (context->config.request.init != NULL) {
143 context->config.request.init(req + 1);
144 }
145 }
146
ucp_worker_request_fini_proxy(ucs_mpool_t * mp,void * obj)147 static void ucp_worker_request_fini_proxy(ucs_mpool_t *mp, void *obj)
148 {
149 ucp_worker_h worker = ucs_container_of(mp, ucp_worker_t, req_mp);
150 ucp_context_h context = worker->context;
151 ucp_request_t *req = obj;
152
153 if (context->config.request.cleanup != NULL) {
154 context->config.request.cleanup(req + 1);
155 }
156 }
157
158 ucs_mpool_ops_t ucp_request_mpool_ops = {
159 .chunk_alloc = ucs_mpool_hugetlb_malloc,
160 .chunk_release = ucs_mpool_hugetlb_free,
161 .obj_init = ucp_worker_request_init_proxy,
162 .obj_cleanup = ucp_worker_request_fini_proxy
163 };
164
165 ucs_mpool_ops_t ucp_rndv_get_mpool_ops = {
166 .chunk_alloc = ucs_mpool_chunk_malloc,
167 .chunk_release = ucs_mpool_chunk_free,
168 .obj_init = NULL,
169 .obj_cleanup = NULL
170 };
171
ucp_request_pending_add(ucp_request_t * req,ucs_status_t * req_status,unsigned pending_flags)172 int ucp_request_pending_add(ucp_request_t *req, ucs_status_t *req_status,
173 unsigned pending_flags)
174 {
175 ucs_status_t status;
176 uct_ep_h uct_ep;
177
178 ucs_assertv(req->send.lane != UCP_NULL_LANE, "%s() did not set req->send.lane",
179 ucs_debug_get_symbol_name(req->send.uct.func));
180
181 uct_ep = req->send.ep->uct_eps[req->send.lane];
182 status = uct_ep_pending_add(uct_ep, &req->send.uct, pending_flags);
183 if (status == UCS_OK) {
184 ucs_trace_data("ep %p: added pending uct request %p to lane[%d]=%p",
185 req->send.ep, req, req->send.lane, uct_ep);
186 *req_status = UCS_INPROGRESS;
187 req->send.pending_lane = req->send.lane;
188 return 1;
189 } else if (status == UCS_ERR_BUSY) {
190 /* Could not add, try to send again */
191 return 0;
192 }
193
194 /* Unexpected error while adding to pending */
195 ucs_fatal("invalid return status from uct_ep_pending_add(): %s",
196 ucs_status_string(status));
197 }
198
ucp_request_dt_dereg(ucp_context_t * context,ucp_dt_reg_t * dt_reg,size_t count,ucp_request_t * req_dbg)199 static void ucp_request_dt_dereg(ucp_context_t *context, ucp_dt_reg_t *dt_reg,
200 size_t count, ucp_request_t *req_dbg)
201 {
202 size_t i;
203
204 for (i = 0; i < count; ++i) {
205 ucp_trace_req(req_dbg, "mem dereg buffer %ld/%ld md_map 0x%"PRIx64,
206 i, count, dt_reg[i].md_map);
207 ucp_mem_rereg_mds(context, 0, NULL, 0, 0, NULL, UCS_MEMORY_TYPE_HOST, NULL,
208 dt_reg[i].memh, &dt_reg[i].md_map);
209 ucs_assert(dt_reg[i].md_map == 0);
210 }
211 }
212
213 UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
214 (context, md_map, buffer, length, datatype, state, mem_type, req_dbg, uct_flags),
215 ucp_context_t *context, ucp_md_map_t md_map, void *buffer,
216 size_t length, ucp_datatype_t datatype, ucp_dt_state_t *state,
217 ucs_memory_type_t mem_type, ucp_request_t *req_dbg, unsigned uct_flags)
218 {
219 size_t iov_it, iovcnt;
220 const ucp_dt_iov_t *iov;
221 ucp_dt_reg_t *dt_reg;
222 ucs_status_t status;
223 int flags;
224 int level;
225
226 ucs_trace_func("context=%p md_map=0x%lx buffer=%p length=%zu datatype=0x%lu "
227 "state=%p", context, md_map, buffer, length, datatype, state);
228
229 status = UCS_OK;
230 flags = UCT_MD_MEM_ACCESS_RMA | uct_flags;
231 switch (datatype & UCP_DATATYPE_CLASS_MASK) {
232 case UCP_DATATYPE_CONTIG:
233 ucs_assert(ucs_popcount(md_map) <= UCP_MAX_OP_MDS);
234 status = ucp_mem_rereg_mds(context, md_map, buffer, length, flags,
235 NULL, mem_type, NULL, state->dt.contig.memh,
236 &state->dt.contig.md_map);
237 ucp_trace_req(req_dbg, "mem reg md_map 0x%"PRIx64"/0x%"PRIx64,
238 state->dt.contig.md_map, md_map);
239 break;
240 case UCP_DATATYPE_IOV:
241 iovcnt = state->dt.iov.iovcnt;
242 iov = buffer;
243 dt_reg = ((state->dt.iov.dt_reg == NULL) ?
244 ucs_calloc(iovcnt, sizeof(*dt_reg), "iov_dt_reg") :
245 state->dt.iov.dt_reg);
246 if (NULL == dt_reg) {
247 status = UCS_ERR_NO_MEMORY;
248 goto err;
249 }
250 for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
251 if (iov[iov_it].length) {
252 status = ucp_mem_rereg_mds(context, md_map, iov[iov_it].buffer,
253 iov[iov_it].length, flags, NULL,
254 mem_type, NULL, dt_reg[iov_it].memh,
255 &dt_reg[iov_it].md_map);
256 if (status != UCS_OK) {
257 /* unregister previously registered memory */
258 ucp_request_dt_dereg(context, dt_reg, iov_it, req_dbg);
259 ucs_free(dt_reg);
260 goto err;
261 }
262 ucp_trace_req(req_dbg,
263 "mem reg iov %ld/%ld md_map 0x%"PRIx64"/0x%"PRIx64,
264 iov_it, iovcnt, dt_reg[iov_it].md_map, md_map);
265 }
266 }
267 state->dt.iov.dt_reg = dt_reg;
268 break;
269 default:
270 status = UCS_ERR_INVALID_PARAM;
271 ucs_error("Invalid data type %lx", datatype);
272 }
273
274 err:
275 if (status != UCS_OK) {
276 level = (flags & UCT_MD_MEM_FLAG_HIDE_ERRORS) ?
277 UCS_LOG_LEVEL_DEBUG : UCS_LOG_LEVEL_ERROR;
278 ucs_log(level,
279 "failed to register user buffer datatype 0x%lx address %p len %zu:"
280 " %s", datatype, buffer, length, ucs_status_string(status));
281 }
282 return status;
283 }
284
285 UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg, (context, datatype, state, req_dbg),
286 ucp_context_t *context, ucp_datatype_t datatype,
287 ucp_dt_state_t *state, ucp_request_t *req_dbg)
288 {
289 ucs_trace_func("context=%p datatype=0x%lu state=%p", context, datatype,
290 state);
291
292 switch (datatype & UCP_DATATYPE_CLASS_MASK) {
293 case UCP_DATATYPE_CONTIG:
294 ucp_request_dt_dereg(context, &state->dt.contig, 1, req_dbg);
295 break;
296 case UCP_DATATYPE_IOV:
297 if (state->dt.iov.dt_reg != NULL) {
298 ucp_request_dt_dereg(context, state->dt.iov.dt_reg,
299 state->dt.iov.iovcnt, req_dbg);
300 ucs_free(state->dt.iov.dt_reg);
301 state->dt.iov.dt_reg = NULL;
302 }
303 break;
304 default:
305 break;
306 }
307 }
308
309 /* NOTE: deprecated */
ucp_request_test(void * request,ucp_tag_recv_info_t * info)310 ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info)
311 {
312 ucp_request_t *req = (ucp_request_t*)request - 1;
313
314 if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
315 if (req->flags & UCP_REQUEST_FLAG_RECV) {
316 *info = req->recv.tag.info;
317 }
318 ucs_assert(req->status != UCS_INPROGRESS);
319 return req->status;
320 }
321 return UCS_INPROGRESS;
322 }
323
324 static UCS_F_ALWAYS_INLINE
ucp_request_init_multi_proto(ucp_request_t * req,uct_pending_callback_t multi_func,const char * multi_func_str)325 void ucp_request_init_multi_proto(ucp_request_t *req,
326 uct_pending_callback_t multi_func,
327 const char *multi_func_str)
328 {
329 req->send.uct.func = multi_func;
330
331 if (req->flags & (UCP_REQUEST_FLAG_SEND_TAG |
332 UCP_REQUEST_FLAG_SEND_AM)) {
333 req->send.msg_proto.message_id = req->send.ep->worker->am_message_id++;
334 req->send.msg_proto.am_bw_index = 0;
335 }
336
337 req->send.pending_lane = UCP_NULL_LANE;
338 UCS_PROFILE_REQUEST_EVENT(req, multi_func_str, req->send.length);
339 }
340
341 ucs_status_t
ucp_request_send_start(ucp_request_t * req,ssize_t max_short,size_t zcopy_thresh,size_t zcopy_max,size_t dt_count,const ucp_ep_msg_config_t * msg_config,const ucp_request_send_proto_t * proto)342 ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
343 size_t zcopy_thresh, size_t zcopy_max, size_t dt_count,
344 const ucp_ep_msg_config_t* msg_config,
345 const ucp_request_send_proto_t *proto)
346 {
347 size_t length = req->send.length;
348 ucs_status_t status;
349 int multi;
350
351 if ((ssize_t)length <= max_short) {
352 /* short */
353 req->send.uct.func = proto->contig_short;
354 UCS_PROFILE_REQUEST_EVENT(req, "start_contig_short", req->send.length);
355 return UCS_OK;
356 } else if (length < zcopy_thresh) {
357 /* bcopy */
358 ucp_request_send_state_reset(req, NULL, UCP_REQUEST_SEND_PROTO_BCOPY_AM);
359 ucs_assert(msg_config->max_bcopy >= proto->only_hdr_size);
360 if (length <= (msg_config->max_bcopy - proto->only_hdr_size)) {
361 req->send.uct.func = proto->bcopy_single;
362 UCS_PROFILE_REQUEST_EVENT(req, "start_bcopy_single", req->send.length);
363 } else {
364 ucp_request_init_multi_proto(req, proto->bcopy_multi,
365 "start_bcopy_multi");
366 }
367 return UCS_OK;
368 } else if (length < zcopy_max) {
369 /* zcopy */
370 ucp_request_send_state_reset(req, proto->zcopy_completion,
371 UCP_REQUEST_SEND_PROTO_ZCOPY_AM);
372 status = ucp_request_send_buffer_reg_lane(req, req->send.lane, 0);
373 if (status != UCS_OK) {
374 return status;
375 }
376
377 if (ucs_unlikely(length > msg_config->max_zcopy - proto->only_hdr_size)) {
378 multi = 1;
379 } else if (ucs_unlikely(UCP_DT_IS_IOV(req->send.datatype))) {
380 if (dt_count <= msg_config->max_iov) {
381 multi = 0;
382 } else {
383 multi = ucp_dt_iov_count_nonempty(req->send.buffer, dt_count) >
384 msg_config->max_iov;
385 }
386 } else {
387 multi = 0;
388 }
389
390 if (multi) {
391 ucp_request_init_multi_proto(req, proto->zcopy_multi,
392 "start_zcopy_multi");
393 } else {
394 req->send.uct.func = proto->zcopy_single;
395 UCS_PROFILE_REQUEST_EVENT(req, "start_zcopy_single", req->send.length);
396 }
397 return UCS_OK;
398 }
399
400 return UCS_ERR_NO_PROGRESS;
401 }
402
ucp_request_send_state_ff(ucp_request_t * req,ucs_status_t status)403 void ucp_request_send_state_ff(ucp_request_t *req, ucs_status_t status)
404 {
405 /*
406 * FIXME should not fast-forward requests owned by UCT
407 */
408 ucp_trace_req(req, "fast-forward with status %s", ucs_status_string(status));
409
410 if (req->send.state.uct_comp.func == ucp_ep_flush_completion) {
411 ucp_ep_flush_request_ff(req, status);
412 } else if (req->send.state.uct_comp.func) {
413 req->send.state.dt.offset = req->send.length;
414 req->send.state.uct_comp.count = 0;
415 req->send.state.uct_comp.func(&req->send.state.uct_comp, status);
416 } else {
417 ucp_request_complete_send(req, status);
418 }
419 }
420
ucp_request_recv_msg_truncated(ucp_request_t * req,size_t length,size_t offset)421 ucs_status_t ucp_request_recv_msg_truncated(ucp_request_t *req, size_t length,
422 size_t offset)
423 {
424 ucp_dt_generic_t *dt_gen;
425
426 ucs_debug("message truncated: recv_length %zu offset %zu buffer_size %zu",
427 length, offset, req->recv.length);
428
429 if (UCP_DT_IS_GENERIC(req->recv.datatype)) {
430 dt_gen = ucp_dt_generic(req->recv.datatype);
431 UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
432 req->recv.state.dt.generic.state);
433 }
434
435 return UCS_ERR_MESSAGE_TRUNCATED;
436 }
437
438
439