1/**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7#ifndef UCP_REQUEST_INL_
8#define UCP_REQUEST_INL_
9
10#include "ucp_request.h"
11#include "ucp_worker.h"
12#include "ucp_ep.inl"
13
14#include <ucp/core/ucp_worker.h>
15#include <ucp/dt/dt.h>
16#include <ucs/profile/profile.h>
17#include <ucs/datastruct/mpool.inl>
18#include <ucp/dt/dt.inl>
19#include <inttypes.h>
20
21
22#define UCP_REQUEST_FLAGS_FMT \
23    "%c%c%c%c%c%c%c"
24
25#define UCP_REQUEST_FLAGS_ARG(_flags) \
26    (((_flags) & UCP_REQUEST_FLAG_COMPLETED)       ? 'd' : '-'), \
27    (((_flags) & UCP_REQUEST_FLAG_RELEASED)        ? 'f' : '-'), \
28    (((_flags) & UCP_REQUEST_FLAG_EXPECTED)        ? 'e' : '-'), \
29    (((_flags) & UCP_REQUEST_FLAG_LOCAL_COMPLETED) ? 'L' : '-'), \
30    (((_flags) & UCP_REQUEST_FLAG_CALLBACK)        ? 'c' : '-'), \
31    (((_flags) & UCP_REQUEST_FLAG_RECV)            ? 'r' : '-'), \
32    (((_flags) & UCP_REQUEST_FLAG_SYNC)            ? 's' : '-')
33
34#define UCP_RECV_DESC_FMT \
35    "rdesc %p %c%c%c%c%c%c len %u+%u"
36
37#define UCP_RECV_DESC_ARG(_rdesc) \
38    (_rdesc), \
39    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_UCT_DESC)      ? 't' : '-'), \
40    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER)         ? 'e' : '-'), \
41    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_ONLY)    ? 'o' : '-'), \
42    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_SYNC)    ? 's' : '-'), \
43    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_OFFLOAD) ? 'f' : '-'), \
44    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_RNDV)          ? 'r' : '-'), \
45    (_rdesc)->payload_offset, \
46    ((_rdesc)->length - (_rdesc)->payload_offset)
47
48
49/* defined as a macro to print the call site */
50#define ucp_request_get(_worker) \
51    ({ \
52        ucp_request_t *_req = ucs_mpool_get_inline(&(_worker)->req_mp); \
53        if (_req != NULL) { \
54            VALGRIND_MAKE_MEM_DEFINED(_req + 1, \
55                                      (_worker)->context->config.request.size); \
56            ucs_trace_req("allocated request %p", _req); \
57            UCS_PROFILE_REQUEST_NEW(_req, "ucp_request", 0); \
58        } \
59        _req; \
60    })
61
62#define ucp_request_complete(_req, _cb, _status, ...) \
63    { \
64        (_req)->status = (_status); \
65        if (ucs_likely((_req)->flags & UCP_REQUEST_FLAG_CALLBACK)) { \
66            (_req)->_cb((_req) + 1, (_status), ## __VA_ARGS__); \
67        } \
68        if (ucs_unlikely(((_req)->flags  |= UCP_REQUEST_FLAG_COMPLETED) & \
69                         UCP_REQUEST_FLAG_RELEASED)) { \
70            ucp_request_put(_req); \
71        } \
72    }
73
74#define ucp_request_set_callback(_req, _cb, _cb_value, _user_data) \
75    { \
76        (_req)->_cb       = _cb_value; \
77        (_req)->user_data = _user_data; \
78        (_req)->flags    |= UCP_REQUEST_FLAG_CALLBACK; \
79        ucs_trace_data("request %p %s set to %p, user data: %p", \
80                      _req, #_cb, _cb_value, _user_data); \
81    }
82
83
84#define ucp_request_get_param(_worker, _param, _failed) \
85    ({ \
86        ucp_request_t *__req; \
87        if (!((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_REQUEST)) { \
88            __req = ucp_request_get(_worker); \
89            if (ucs_unlikely((__req) == NULL)) { \
90                _failed; \
91            } \
92        } else { \
93            __req = ((ucp_request_t*)(_param)->request) - 1; \
94        } \
95        __req; \
96    })
97
98
99#define ucp_request_put_param(_param, _req) \
100    if (!((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_REQUEST)) { \
101        ucp_request_put(_req); \
102    }
103
104
105#define ucp_request_cb_param(_param, _req, _cb, ...) \
106    if ((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) { \
107        param->cb._cb(req + 1, status, ##__VA_ARGS__, param->user_data); \
108    }
109
110
111#define ucp_request_imm_cmpl_param(_param, _req, _status, _cb, ...) \
112    if ((_param)->op_attr_mask & UCP_OP_ATTR_FLAG_NO_IMM_CMPL) { \
113        ucp_request_cb_param(_param, _req, _cb, ##__VA_ARGS__); \
114        ucs_trace_req("request %p completed, but immediate completion is " \
115                      "prohibited, status %s", _req, \
116                      ucs_status_string(_status)); \
117        return (_req) + 1; \
118    } \
119    ucp_request_put_param(_param, _req); \
120    return UCS_STATUS_PTR(_status);
121
122
123#define ucp_request_set_send_callback_param(_param, _req, _cb) \
124    if ((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) { \
125        ucp_request_set_callback(_req, _cb.cb, (_param)->cb.send, \
126                                 ((_param)->op_attr_mask & \
127                                  UCP_OP_ATTR_FIELD_USER_DATA) ? \
128                                 (_param)->user_data : NULL); \
129    }
130
131
132static UCS_F_ALWAYS_INLINE void
133ucp_request_put(ucp_request_t *req)
134{
135    ucs_trace_req("put request %p", req);
136    UCS_PROFILE_REQUEST_FREE(req);
137    ucs_mpool_put_inline(req);
138}
139
140static UCS_F_ALWAYS_INLINE void
141ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
142{
143    ucs_trace_req("completing send request %p (%p) "UCP_REQUEST_FLAGS_FMT" %s",
144                  req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
145                  ucs_status_string(status));
146    UCS_PROFILE_REQUEST_EVENT(req, "complete_send", status);
147    ucp_request_complete(req, send.cb, status, req->user_data);
148}
149
150static UCS_F_ALWAYS_INLINE void
151ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status)
152{
153    ucs_trace_req("completing receive request %p (%p) "UCP_REQUEST_FLAGS_FMT
154                  " stag 0x%"PRIx64" len %zu, %s",
155                  req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
156                  req->recv.tag.info.sender_tag, req->recv.tag.info.length,
157                  ucs_status_string(status));
158    UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
159    ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info,
160                         req->user_data);
161}
162
163static UCS_F_ALWAYS_INLINE void
164ucp_request_complete_stream_recv(ucp_request_t *req, ucp_ep_ext_proto_t* ep_ext,
165                                 ucs_status_t status)
166{
167    /* dequeue request before complete */
168    ucp_request_t *check_req UCS_V_UNUSED =
169            ucs_queue_pull_elem_non_empty(&ep_ext->stream.match_q, ucp_request_t,
170                                          recv.queue);
171    ucs_assert(check_req               == req);
172    ucs_assert((req->recv.stream.offset > 0) || UCS_STATUS_IS_ERR(status));
173
174    req->recv.stream.length = req->recv.stream.offset;
175    ucs_trace_req("completing stream receive request %p (%p) "
176                  UCP_REQUEST_FLAGS_FMT" count %zu, %s",
177                  req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
178                  req->recv.stream.length, ucs_status_string(status));
179    UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
180    ucp_request_complete(req, recv.stream.cb, status, req->recv.stream.length,
181                         req->user_data);
182}
183
184static UCS_F_ALWAYS_INLINE int
185ucp_request_can_complete_stream_recv(ucp_request_t *req)
186{
187    /* NOTE: first check is needed to avoid heavy "%" operation if request is
188     *       completely filled */
189    if (req->recv.stream.offset == req->recv.length) {
190        return 1;
191    }
192
193    if (req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL) {
194        return 0;
195    }
196
197    /* 0-length stream recv is meaningless if this was not requested explicitely */
198    if (req->recv.stream.offset == 0) {
199        return 0;
200    }
201
202    if (ucs_likely(UCP_DT_IS_CONTIG(req->recv.datatype))) {
203        return req->recv.stream.offset %
204               ucp_contig_dt_elem_size(req->recv.datatype) == 0;
205    }
206
207    /* Currently, all data types except contig has granularity 1 byte */
208    return 1;
209}
210
211/*
212 * @return Whether completed.
213 *         *req_status if filled with the completion status if completed.
214 */
215static int UCS_F_ALWAYS_INLINE
216ucp_request_try_send(ucp_request_t *req, ucs_status_t *req_status,
217                     unsigned pending_flags)
218{
219    ucs_status_t status;
220
221    /* coverity wrongly resolves (*req).send.uct.func to test_uct_pending::pending_send_op_ok */
222    /* coverity[address_free] */
223    status = req->send.uct.func(&req->send.uct);
224    if (status == UCS_OK) {
225        /* Completed the operation */
226        *req_status = UCS_OK;
227        return 1;
228    } else if (status == UCS_INPROGRESS) {
229        /* Not completed, but made progress */
230        return 0;
231    } else if (status != UCS_ERR_NO_RESOURCE) {
232        /* Unexpected error */
233        ucp_request_complete_send(req, status);
234        *req_status = status;
235        return 1;
236    }
237
238    /* No send resources, try to add to pending queue */
239    ucs_assert(status == UCS_ERR_NO_RESOURCE);
240    return ucp_request_pending_add(req, req_status, pending_flags);
241}
242
243/**
244 * Start sending a request.
245 *
246 * @param [in]  req             Request to start.
247 * @param [in]  pending_flags   flags to be passed to UCT if request will be
248 *                              added to pending queue.
249 *
250 * @return UCS_OK - completed (callback will not be called)
251 *         UCS_INPROGRESS - started but not completed
252 *         other error - failure
253 */
254static UCS_F_ALWAYS_INLINE ucs_status_t
255ucp_request_send(ucp_request_t *req, unsigned pending_flags)
256{
257    ucs_status_t status = UCS_ERR_NOT_IMPLEMENTED;
258    while (!ucp_request_try_send(req, &status, pending_flags));
259    return status;
260}
261
262static UCS_F_ALWAYS_INLINE
263void ucp_request_send_generic_dt_finish(ucp_request_t *req)
264{
265    ucp_dt_generic_t *dt;
266    if (UCP_DT_IS_GENERIC(req->send.datatype)) {
267        dt = ucp_dt_generic(req->send.datatype);
268        ucs_assert(NULL != dt);
269        dt->ops.finish(req->send.state.dt.dt.generic.state);
270    }
271}
272
273static UCS_F_ALWAYS_INLINE
274void ucp_request_recv_generic_dt_finish(ucp_request_t *req)
275{
276    ucp_dt_generic_t *dt;
277    if (UCP_DT_IS_GENERIC(req->recv.datatype)) {
278        dt = ucp_dt_generic(req->recv.datatype);
279        ucs_assert(NULL != dt);
280        dt->ops.finish(req->recv.state.dt.generic.state);
281    }
282}
283
284static UCS_F_ALWAYS_INLINE void
285ucp_request_send_state_init(ucp_request_t *req, ucp_datatype_t datatype,
286                            size_t dt_count)
287{
288    ucp_dt_generic_t *dt_gen;
289    void             *state_gen;
290
291    VALGRIND_MAKE_MEM_UNDEFINED(&req->send.state.uct_comp.count,
292                                sizeof(req->send.state.uct_comp.count));
293    VALGRIND_MAKE_MEM_UNDEFINED(&req->send.state.dt.offset,
294                                sizeof(req->send.state.dt.offset));
295
296    req->send.state.uct_comp.func = NULL;
297
298    switch (datatype & UCP_DATATYPE_CLASS_MASK) {
299    case UCP_DATATYPE_CONTIG:
300        req->send.state.dt.dt.contig.md_map     = 0;
301        return;
302    case UCP_DATATYPE_IOV:
303        req->send.state.dt.dt.iov.iovcnt_offset = 0;
304        req->send.state.dt.dt.iov.iov_offset    = 0;
305        req->send.state.dt.dt.iov.iovcnt        = dt_count;
306        req->send.state.dt.dt.iov.dt_reg        = NULL;
307        return;
308    case UCP_DATATYPE_GENERIC:
309        dt_gen    = ucp_dt_generic(datatype);
310        state_gen = dt_gen->ops.start_pack(dt_gen->context, req->send.buffer,
311                                           dt_count);
312        req->send.state.dt.dt.generic.state = state_gen;
313        return;
314    default:
315        ucs_fatal("Invalid data type");
316    }
317}
318
319static UCS_F_ALWAYS_INLINE void
320ucp_request_send_state_reset(ucp_request_t *req,
321                             uct_completion_callback_t comp_cb, unsigned proto)
322{
323    switch (proto) {
324    case UCP_REQUEST_SEND_PROTO_RMA:
325        ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype));
326        /* Fall through */
327    case UCP_REQUEST_SEND_PROTO_RNDV_GET:
328    case UCP_REQUEST_SEND_PROTO_RNDV_PUT:
329    case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
330        req->send.state.uct_comp.func       = comp_cb;
331        req->send.state.uct_comp.count      = 0;
332        /* Fall through */
333    case UCP_REQUEST_SEND_PROTO_BCOPY_AM:
334        req->send.state.dt.offset           = 0;
335        break;
336    default:
337        ucs_fatal("unknown protocol");
338    }
339}
340
341/**
342 * Advance state of send request after UCT operation. This function applies
343 * @a new_dt_state to @a req request according to @a proto protocol. Also, UCT
344 * completion counter will be incremented if @a proto requires it.
345 *
346 * @param [inout]   req             Send request.
347 * @param [in]      new_dt_state    State which was progressed by
348 *                                  @ref ucp_dt_pack or @ref ucp_dt_iov_copy_uct.
349 * @param [in]      proto           Internal UCP protocol identifier
350 *                                  (UCP_REQUEST_SEND_PROTO_*)
351 * @param [in]      status          Status of the last UCT operation which
352 *                                  progressed @a proto protocol.
353 */
354static UCS_F_ALWAYS_INLINE void
355ucp_request_send_state_advance(ucp_request_t *req,
356                               const ucp_dt_state_t *new_dt_state,
357                               unsigned proto, ucs_status_t status)
358{
359    if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
360        /* Don't advance after failed operation in order to continue on next try
361         * from last valid point.
362         */
363        return;
364    }
365
366    switch (proto) {
367    case UCP_REQUEST_SEND_PROTO_RMA:
368        if (status == UCS_INPROGRESS) {
369            ++req->send.state.uct_comp.count;
370        }
371        break;
372    case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
373        /* Fall through */
374    case UCP_REQUEST_SEND_PROTO_RNDV_GET:
375    case UCP_REQUEST_SEND_PROTO_RNDV_PUT:
376        if (status == UCS_INPROGRESS) {
377            ++req->send.state.uct_comp.count;
378        }
379        /* Fall through */
380    case UCP_REQUEST_SEND_PROTO_BCOPY_AM:
381        ucs_assert(new_dt_state != NULL);
382        if (UCP_DT_IS_CONTIG(req->send.datatype)) {
383            /* cppcheck-suppress nullPointer */
384            req->send.state.dt.offset = new_dt_state->offset;
385        } else {
386            /* cppcheck-suppress nullPointer */
387            req->send.state.dt        = *new_dt_state;
388        }
389        break;
390    default:
391        ucs_fatal("unknown protocol");
392    }
393
394    /* offset is not used for RMA */
395    ucs_assert((proto == UCP_REQUEST_SEND_PROTO_RMA) ||
396               (req->send.state.dt.offset <= req->send.length));
397}
398
399static UCS_F_ALWAYS_INLINE ucs_status_t
400ucp_request_send_buffer_reg(ucp_request_t *req, ucp_md_map_t md_map,
401                            unsigned uct_flags)
402{
403    return ucp_request_memory_reg(req->send.ep->worker->context, md_map,
404                                  (void*)req->send.buffer, req->send.length,
405                                  req->send.datatype, &req->send.state.dt,
406                                  req->send.mem_type, req, uct_flags);
407}
408
409static UCS_F_ALWAYS_INLINE ucs_status_t
410ucp_request_send_buffer_reg_lane_check(ucp_request_t *req, ucp_lane_index_t lane,
411                                       ucp_md_map_t prev_md_map, unsigned uct_flags)
412{
413    ucp_md_map_t md_map;
414
415    if (!(ucp_ep_md_attr(req->send.ep,
416                         lane)->cap.flags & UCT_MD_FLAG_NEED_MEMH)) {
417        return UCS_OK;
418    }
419
420    ucs_assert(ucp_ep_md_attr(req->send.ep,
421                              lane)->cap.flags & UCT_MD_FLAG_REG);
422    md_map = UCS_BIT(ucp_ep_md_index(req->send.ep, lane)) | prev_md_map;
423    return ucp_request_send_buffer_reg(req, md_map, uct_flags);
424}
425
426static UCS_F_ALWAYS_INLINE ucs_status_t
427ucp_request_send_buffer_reg_lane(ucp_request_t *req, ucp_lane_index_t lane,
428                                 unsigned uct_flags)
429{
430    return ucp_request_send_buffer_reg_lane_check(req, lane, 0, uct_flags);
431}
432
433static UCS_F_ALWAYS_INLINE ucs_status_t
434ucp_send_request_add_reg_lane(ucp_request_t *req, ucp_lane_index_t lane)
435{
436    /* Add new lane to registration map */
437    ucp_md_map_t md_map;
438
439    if (ucs_likely(UCP_DT_IS_CONTIG(req->send.datatype))) {
440        md_map = req->send.state.dt.dt.contig.md_map;
441    } else if (UCP_DT_IS_IOV(req->send.datatype) &&
442               (req->send.state.dt.dt.iov.dt_reg != NULL)) {
443        /* dt_reg can be NULL if underlying UCT TL doesn't require
444         * memory handle for for local AM/GET/PUT operations
445         * (i.e. UCT_MD_FLAG_NEED_MEMH is not set) */
446        /* Can use the first DT registration element, since
447         * they have the same MD maps */
448        md_map = req->send.state.dt.dt.iov.dt_reg[0].md_map;
449    } else {
450        md_map = 0;
451    }
452
453    ucs_assert(ucs_popcount(md_map) <= UCP_MAX_OP_MDS);
454    return ucp_request_send_buffer_reg_lane_check(req, lane, md_map, 0);
455}
456
457static UCS_F_ALWAYS_INLINE ucs_status_t
458ucp_request_recv_buffer_reg(ucp_request_t *req, ucp_md_map_t md_map,
459                            size_t length)
460{
461    return ucp_request_memory_reg(req->recv.worker->context, md_map,
462                                  req->recv.buffer, length,
463                                  req->recv.datatype, &req->recv.state,
464                                  req->recv.mem_type, req,
465                                  UCT_MD_MEM_FLAG_HIDE_ERRORS);
466}
467
468static UCS_F_ALWAYS_INLINE void ucp_request_send_buffer_dereg(ucp_request_t *req)
469{
470    ucp_request_memory_dereg(req->send.ep->worker->context, req->send.datatype,
471                             &req->send.state.dt, req);
472}
473
474static UCS_F_ALWAYS_INLINE void ucp_request_recv_buffer_dereg(ucp_request_t *req)
475{
476    ucp_request_memory_dereg(req->recv.worker->context, req->recv.datatype,
477                             &req->recv.state, req);
478}
479
480static UCS_F_ALWAYS_INLINE void
481ucp_request_wait_uct_comp(ucp_request_t *req)
482{
483    while (req->send.state.uct_comp.count > 0) {
484        ucp_worker_progress(req->send.ep->worker);
485    }
486}
487
488static UCS_F_ALWAYS_INLINE void
489ucp_request_unpack_contig(ucp_request_t *req, void *buf, const void *data,
490                          size_t length)
491{
492    if (ucs_likely(UCP_MEM_IS_ACCESSIBLE_FROM_CPU(req->recv.mem_type))) {
493        UCS_PROFILE_NAMED_CALL("memcpy_recv", ucs_memcpy_relaxed, buf,
494                               data, length);
495    } else {
496        ucp_mem_type_unpack(req->recv.worker, buf, data, length,
497                            req->recv.mem_type);
498    }
499}
500
501/**
502 * Unpack receive data to a request
503 *
504 * req - receive request
505 * data - data to unpack
506 * length -
507 * offset - offset of received data within the request, for OOO fragments
508 *
509 *
510 */
511static UCS_F_ALWAYS_INLINE ucs_status_t
512ucp_request_recv_data_unpack(ucp_request_t *req, const void *data,
513                             size_t length, size_t offset, int last)
514{
515    ucp_dt_generic_t *dt_gen;
516    ucs_status_t status;
517
518    ucs_assert(req->status == UCS_OK);
519
520    ucp_trace_req(req, "unpack recv_data req_len %zu data_len %zu offset %zu last: %s",
521                  req->recv.length, length, offset, last ? "yes" : "no");
522
523    if (ucs_unlikely((length + offset) > req->recv.length)) {
524        return ucp_request_recv_msg_truncated(req, length, offset);
525    }
526
527    switch (req->recv.datatype & UCP_DATATYPE_CLASS_MASK) {
528    case UCP_DATATYPE_CONTIG:
529        ucp_request_unpack_contig(req,
530                                  UCS_PTR_BYTE_OFFSET(req->recv.buffer, offset),
531                                  data, length);
532        return UCS_OK;
533
534    case UCP_DATATYPE_IOV:
535        if (offset != req->recv.state.offset) {
536            ucp_dt_iov_seek(req->recv.buffer, req->recv.state.dt.iov.iovcnt,
537                            offset - req->recv.state.offset,
538                            &req->recv.state.dt.iov.iov_offset,
539                            &req->recv.state.dt.iov.iovcnt_offset);
540            req->recv.state.offset = offset;
541        }
542        UCS_PROFILE_CALL(ucp_dt_iov_scatter, req->recv.buffer,
543                         req->recv.state.dt.iov.iovcnt, data, length,
544                         &req->recv.state.dt.iov.iov_offset,
545                         &req->recv.state.dt.iov.iovcnt_offset);
546        req->recv.state.offset += length;
547        return UCS_OK;
548
549    case UCP_DATATYPE_GENERIC:
550        dt_gen = ucp_dt_generic(req->recv.datatype);
551        status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack,
552                                        req->recv.state.dt.generic.state,
553                                        offset, data, length);
554        if (last || (status != UCS_OK)) {
555            UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
556                                        req->recv.state.dt.generic.state);
557        }
558        return status;
559
560    default:
561        ucs_fatal("unexpected datatype=%lx", req->recv.datatype);
562    }
563}
564
565static UCS_F_ALWAYS_INLINE ucs_status_t
566ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length,
567                   int data_offset, unsigned am_flags, uint16_t hdr_len,
568                   uint16_t rdesc_flags, int priv_length,
569                   ucp_recv_desc_t **rdesc_p)
570{
571    ucp_recv_desc_t *rdesc;
572    void *data_hdr;
573    ucs_status_t status;
574
575    if (ucs_unlikely(am_flags & UCT_CB_PARAM_FLAG_DESC)) {
576        /* slowpath */
577        ucs_assert(priv_length <= UCP_WORKER_HEADROOM_PRIV_SIZE);
578        data_hdr               = UCS_PTR_BYTE_OFFSET(data, -data_offset);
579        rdesc                  = (ucp_recv_desc_t *)data_hdr - 1;
580        rdesc->flags           = rdesc_flags | UCP_RECV_DESC_FLAG_UCT_DESC;
581        rdesc->uct_desc_offset = UCP_WORKER_HEADROOM_PRIV_SIZE - priv_length;
582        status                 = UCS_INPROGRESS;
583    } else {
584        rdesc = (ucp_recv_desc_t*)ucs_mpool_get_inline(&worker->am_mp);
585        if (rdesc == NULL) {
586            ucs_error("ucp recv descriptor is not allocated");
587            return UCS_ERR_NO_MEMORY;
588        }
589
590        /* No need to initialize rdesc->priv_length here, because it is only
591         * needed for releasing UCT descriptor. */
592
593        rdesc->flags = rdesc_flags;
594        status       = UCS_OK;
595        memcpy(UCS_PTR_BYTE_OFFSET(rdesc + 1, data_offset), data, length);
596    }
597
598    rdesc->length         = length + data_offset;
599    rdesc->payload_offset = hdr_len;
600    *rdesc_p              = rdesc;
601    return status;
602}
603
604static UCS_F_ALWAYS_INLINE void
605ucp_recv_desc_release(ucp_recv_desc_t *rdesc)
606{
607    void *uct_desc;
608
609    ucs_trace_req("release receive descriptor %p", rdesc);
610    if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_UCT_DESC)) {
611        /* uct desc is slowpath */
612        uct_desc = UCS_PTR_BYTE_OFFSET(rdesc, -rdesc->uct_desc_offset);
613        uct_iface_release_desc(uct_desc);
614    } else {
615        ucs_mpool_put_inline(rdesc);
616    }
617}
618
619static UCS_F_ALWAYS_INLINE ucp_lane_index_t
620ucp_send_request_get_am_bw_lane(ucp_request_t *req)
621{
622    ucp_lane_index_t lane;
623
624    lane = ucp_ep_config(req->send.ep)->
625           key.am_bw_lanes[req->send.msg_proto.am_bw_index];
626    ucs_assertv(lane != UCP_NULL_LANE, "req->send.msg_proto.am_bw_index=%d",
627                req->send.msg_proto.am_bw_index);
628    return lane;
629}
630
631static UCS_F_ALWAYS_INLINE void
632ucp_send_request_next_am_bw_lane(ucp_request_t *req)
633{
634    ucp_lane_index_t am_bw_index = ++req->send.msg_proto.am_bw_index;
635    ucp_ep_config_t *config      = ucp_ep_config(req->send.ep);
636
637    if ((am_bw_index >= UCP_MAX_LANES) ||
638        (config->key.am_bw_lanes[am_bw_index] == UCP_NULL_LANE)) {
639        req->send.msg_proto.am_bw_index = 0;
640    }
641}
642
643static UCS_F_ALWAYS_INLINE uintptr_t ucp_request_get_dest_ep_ptr(ucp_request_t *req)
644{
645    /* This function may return 0, but in such cases the message should not be
646     * sent at all because the am_lane would point to a wireup (proxy) endpoint.
647     * So only the receiver side has an assertion that ep_ptr != 0.
648     */
649    return ucp_ep_dest_ep_ptr(req->send.ep);
650}
651
652static UCS_F_ALWAYS_INLINE uint32_t
653ucp_request_param_flags(const ucp_request_param_t *param)
654{
655    return (param->op_attr_mask & UCP_OP_ATTR_FIELD_FLAGS) ?
656           param->flags : 0;
657}
658
659static UCS_F_ALWAYS_INLINE ucp_datatype_t
660ucp_request_param_datatype(const ucp_request_param_t *param)
661{
662    return (param->op_attr_mask & UCP_OP_ATTR_FIELD_DATATYPE) ?
663           param->datatype : ucp_dt_make_contig(1);
664}
665
666#endif
667