1/** 2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. 3 * 4 * See file LICENSE for terms. 5 */ 6 7#ifndef UCP_REQUEST_INL_ 8#define UCP_REQUEST_INL_ 9 10#include "ucp_request.h" 11#include "ucp_worker.h" 12#include "ucp_ep.inl" 13 14#include <ucp/core/ucp_worker.h> 15#include <ucp/dt/dt.h> 16#include <ucs/profile/profile.h> 17#include <ucs/datastruct/mpool.inl> 18#include <ucp/dt/dt.inl> 19#include <inttypes.h> 20 21 22#define UCP_REQUEST_FLAGS_FMT \ 23 "%c%c%c%c%c%c%c" 24 25#define UCP_REQUEST_FLAGS_ARG(_flags) \ 26 (((_flags) & UCP_REQUEST_FLAG_COMPLETED) ? 'd' : '-'), \ 27 (((_flags) & UCP_REQUEST_FLAG_RELEASED) ? 'f' : '-'), \ 28 (((_flags) & UCP_REQUEST_FLAG_EXPECTED) ? 'e' : '-'), \ 29 (((_flags) & UCP_REQUEST_FLAG_LOCAL_COMPLETED) ? 'L' : '-'), \ 30 (((_flags) & UCP_REQUEST_FLAG_CALLBACK) ? 'c' : '-'), \ 31 (((_flags) & UCP_REQUEST_FLAG_RECV) ? 'r' : '-'), \ 32 (((_flags) & UCP_REQUEST_FLAG_SYNC) ? 's' : '-') 33 34#define UCP_RECV_DESC_FMT \ 35 "rdesc %p %c%c%c%c%c%c len %u+%u" 36 37#define UCP_RECV_DESC_ARG(_rdesc) \ 38 (_rdesc), \ 39 (((_rdesc)->flags & UCP_RECV_DESC_FLAG_UCT_DESC) ? 't' : '-'), \ 40 (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER) ? 'e' : '-'), \ 41 (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_ONLY) ? 'o' : '-'), \ 42 (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_SYNC) ? 's' : '-'), \ 43 (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_OFFLOAD) ? 'f' : '-'), \ 44 (((_rdesc)->flags & UCP_RECV_DESC_FLAG_RNDV) ? 'r' : '-'), \ 45 (_rdesc)->payload_offset, \ 46 ((_rdesc)->length - (_rdesc)->payload_offset) 47 48 49/* defined as a macro to print the call site */ 50#define ucp_request_get(_worker) \ 51 ({ \ 52 ucp_request_t *_req = ucs_mpool_get_inline(&(_worker)->req_mp); \ 53 if (_req != NULL) { \ 54 VALGRIND_MAKE_MEM_DEFINED(_req + 1, \ 55 (_worker)->context->config.request.size); \ 56 ucs_trace_req("allocated request %p", _req); \ 57 UCS_PROFILE_REQUEST_NEW(_req, "ucp_request", 0); \ 58 } \ 59 _req; \ 60 }) 61 62#define ucp_request_complete(_req, _cb, _status, ...) \ 63 { \ 64 (_req)->status = (_status); \ 65 if (ucs_likely((_req)->flags & UCP_REQUEST_FLAG_CALLBACK)) { \ 66 (_req)->_cb((_req) + 1, (_status), ## __VA_ARGS__); \ 67 } \ 68 if (ucs_unlikely(((_req)->flags |= UCP_REQUEST_FLAG_COMPLETED) & \ 69 UCP_REQUEST_FLAG_RELEASED)) { \ 70 ucp_request_put(_req); \ 71 } \ 72 } 73 74#define ucp_request_set_callback(_req, _cb, _cb_value, _user_data) \ 75 { \ 76 (_req)->_cb = _cb_value; \ 77 (_req)->user_data = _user_data; \ 78 (_req)->flags |= UCP_REQUEST_FLAG_CALLBACK; \ 79 ucs_trace_data("request %p %s set to %p, user data: %p", \ 80 _req, #_cb, _cb_value, _user_data); \ 81 } 82 83 84#define ucp_request_get_param(_worker, _param, _failed) \ 85 ({ \ 86 ucp_request_t *__req; \ 87 if (!((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_REQUEST)) { \ 88 __req = ucp_request_get(_worker); \ 89 if (ucs_unlikely((__req) == NULL)) { \ 90 _failed; \ 91 } \ 92 } else { \ 93 __req = ((ucp_request_t*)(_param)->request) - 1; \ 94 } \ 95 __req; \ 96 }) 97 98 99#define ucp_request_put_param(_param, _req) \ 100 if (!((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_REQUEST)) { \ 101 ucp_request_put(_req); \ 102 } 103 104 105#define ucp_request_cb_param(_param, _req, _cb, ...) \ 106 if ((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) { \ 107 param->cb._cb(req + 1, status, ##__VA_ARGS__, param->user_data); \ 108 } 109 110 111#define ucp_request_imm_cmpl_param(_param, _req, _status, _cb, ...) \ 112 if ((_param)->op_attr_mask & UCP_OP_ATTR_FLAG_NO_IMM_CMPL) { \ 113 ucp_request_cb_param(_param, _req, _cb, ##__VA_ARGS__); \ 114 ucs_trace_req("request %p completed, but immediate completion is " \ 115 "prohibited, status %s", _req, \ 116 ucs_status_string(_status)); \ 117 return (_req) + 1; \ 118 } \ 119 ucp_request_put_param(_param, _req); \ 120 return UCS_STATUS_PTR(_status); 121 122 123#define ucp_request_set_send_callback_param(_param, _req, _cb) \ 124 if ((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) { \ 125 ucp_request_set_callback(_req, _cb.cb, (_param)->cb.send, \ 126 ((_param)->op_attr_mask & \ 127 UCP_OP_ATTR_FIELD_USER_DATA) ? \ 128 (_param)->user_data : NULL); \ 129 } 130 131 132static UCS_F_ALWAYS_INLINE void 133ucp_request_put(ucp_request_t *req) 134{ 135 ucs_trace_req("put request %p", req); 136 UCS_PROFILE_REQUEST_FREE(req); 137 ucs_mpool_put_inline(req); 138} 139 140static UCS_F_ALWAYS_INLINE void 141ucp_request_complete_send(ucp_request_t *req, ucs_status_t status) 142{ 143 ucs_trace_req("completing send request %p (%p) "UCP_REQUEST_FLAGS_FMT" %s", 144 req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags), 145 ucs_status_string(status)); 146 UCS_PROFILE_REQUEST_EVENT(req, "complete_send", status); 147 ucp_request_complete(req, send.cb, status, req->user_data); 148} 149 150static UCS_F_ALWAYS_INLINE void 151ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status) 152{ 153 ucs_trace_req("completing receive request %p (%p) "UCP_REQUEST_FLAGS_FMT 154 " stag 0x%"PRIx64" len %zu, %s", 155 req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags), 156 req->recv.tag.info.sender_tag, req->recv.tag.info.length, 157 ucs_status_string(status)); 158 UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status); 159 ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info, 160 req->user_data); 161} 162 163static UCS_F_ALWAYS_INLINE void 164ucp_request_complete_stream_recv(ucp_request_t *req, ucp_ep_ext_proto_t* ep_ext, 165 ucs_status_t status) 166{ 167 /* dequeue request before complete */ 168 ucp_request_t *check_req UCS_V_UNUSED = 169 ucs_queue_pull_elem_non_empty(&ep_ext->stream.match_q, ucp_request_t, 170 recv.queue); 171 ucs_assert(check_req == req); 172 ucs_assert((req->recv.stream.offset > 0) || UCS_STATUS_IS_ERR(status)); 173 174 req->recv.stream.length = req->recv.stream.offset; 175 ucs_trace_req("completing stream receive request %p (%p) " 176 UCP_REQUEST_FLAGS_FMT" count %zu, %s", 177 req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags), 178 req->recv.stream.length, ucs_status_string(status)); 179 UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status); 180 ucp_request_complete(req, recv.stream.cb, status, req->recv.stream.length, 181 req->user_data); 182} 183 184static UCS_F_ALWAYS_INLINE int 185ucp_request_can_complete_stream_recv(ucp_request_t *req) 186{ 187 /* NOTE: first check is needed to avoid heavy "%" operation if request is 188 * completely filled */ 189 if (req->recv.stream.offset == req->recv.length) { 190 return 1; 191 } 192 193 if (req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL) { 194 return 0; 195 } 196 197 /* 0-length stream recv is meaningless if this was not requested explicitely */ 198 if (req->recv.stream.offset == 0) { 199 return 0; 200 } 201 202 if (ucs_likely(UCP_DT_IS_CONTIG(req->recv.datatype))) { 203 return req->recv.stream.offset % 204 ucp_contig_dt_elem_size(req->recv.datatype) == 0; 205 } 206 207 /* Currently, all data types except contig has granularity 1 byte */ 208 return 1; 209} 210 211/* 212 * @return Whether completed. 213 * *req_status if filled with the completion status if completed. 214 */ 215static int UCS_F_ALWAYS_INLINE 216ucp_request_try_send(ucp_request_t *req, ucs_status_t *req_status, 217 unsigned pending_flags) 218{ 219 ucs_status_t status; 220 221 /* coverity wrongly resolves (*req).send.uct.func to test_uct_pending::pending_send_op_ok */ 222 /* coverity[address_free] */ 223 status = req->send.uct.func(&req->send.uct); 224 if (status == UCS_OK) { 225 /* Completed the operation */ 226 *req_status = UCS_OK; 227 return 1; 228 } else if (status == UCS_INPROGRESS) { 229 /* Not completed, but made progress */ 230 return 0; 231 } else if (status != UCS_ERR_NO_RESOURCE) { 232 /* Unexpected error */ 233 ucp_request_complete_send(req, status); 234 *req_status = status; 235 return 1; 236 } 237 238 /* No send resources, try to add to pending queue */ 239 ucs_assert(status == UCS_ERR_NO_RESOURCE); 240 return ucp_request_pending_add(req, req_status, pending_flags); 241} 242 243/** 244 * Start sending a request. 245 * 246 * @param [in] req Request to start. 247 * @param [in] pending_flags flags to be passed to UCT if request will be 248 * added to pending queue. 249 * 250 * @return UCS_OK - completed (callback will not be called) 251 * UCS_INPROGRESS - started but not completed 252 * other error - failure 253 */ 254static UCS_F_ALWAYS_INLINE ucs_status_t 255ucp_request_send(ucp_request_t *req, unsigned pending_flags) 256{ 257 ucs_status_t status = UCS_ERR_NOT_IMPLEMENTED; 258 while (!ucp_request_try_send(req, &status, pending_flags)); 259 return status; 260} 261 262static UCS_F_ALWAYS_INLINE 263void ucp_request_send_generic_dt_finish(ucp_request_t *req) 264{ 265 ucp_dt_generic_t *dt; 266 if (UCP_DT_IS_GENERIC(req->send.datatype)) { 267 dt = ucp_dt_generic(req->send.datatype); 268 ucs_assert(NULL != dt); 269 dt->ops.finish(req->send.state.dt.dt.generic.state); 270 } 271} 272 273static UCS_F_ALWAYS_INLINE 274void ucp_request_recv_generic_dt_finish(ucp_request_t *req) 275{ 276 ucp_dt_generic_t *dt; 277 if (UCP_DT_IS_GENERIC(req->recv.datatype)) { 278 dt = ucp_dt_generic(req->recv.datatype); 279 ucs_assert(NULL != dt); 280 dt->ops.finish(req->recv.state.dt.generic.state); 281 } 282} 283 284static UCS_F_ALWAYS_INLINE void 285ucp_request_send_state_init(ucp_request_t *req, ucp_datatype_t datatype, 286 size_t dt_count) 287{ 288 ucp_dt_generic_t *dt_gen; 289 void *state_gen; 290 291 VALGRIND_MAKE_MEM_UNDEFINED(&req->send.state.uct_comp.count, 292 sizeof(req->send.state.uct_comp.count)); 293 VALGRIND_MAKE_MEM_UNDEFINED(&req->send.state.dt.offset, 294 sizeof(req->send.state.dt.offset)); 295 296 req->send.state.uct_comp.func = NULL; 297 298 switch (datatype & UCP_DATATYPE_CLASS_MASK) { 299 case UCP_DATATYPE_CONTIG: 300 req->send.state.dt.dt.contig.md_map = 0; 301 return; 302 case UCP_DATATYPE_IOV: 303 req->send.state.dt.dt.iov.iovcnt_offset = 0; 304 req->send.state.dt.dt.iov.iov_offset = 0; 305 req->send.state.dt.dt.iov.iovcnt = dt_count; 306 req->send.state.dt.dt.iov.dt_reg = NULL; 307 return; 308 case UCP_DATATYPE_GENERIC: 309 dt_gen = ucp_dt_generic(datatype); 310 state_gen = dt_gen->ops.start_pack(dt_gen->context, req->send.buffer, 311 dt_count); 312 req->send.state.dt.dt.generic.state = state_gen; 313 return; 314 default: 315 ucs_fatal("Invalid data type"); 316 } 317} 318 319static UCS_F_ALWAYS_INLINE void 320ucp_request_send_state_reset(ucp_request_t *req, 321 uct_completion_callback_t comp_cb, unsigned proto) 322{ 323 switch (proto) { 324 case UCP_REQUEST_SEND_PROTO_RMA: 325 ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype)); 326 /* Fall through */ 327 case UCP_REQUEST_SEND_PROTO_RNDV_GET: 328 case UCP_REQUEST_SEND_PROTO_RNDV_PUT: 329 case UCP_REQUEST_SEND_PROTO_ZCOPY_AM: 330 req->send.state.uct_comp.func = comp_cb; 331 req->send.state.uct_comp.count = 0; 332 /* Fall through */ 333 case UCP_REQUEST_SEND_PROTO_BCOPY_AM: 334 req->send.state.dt.offset = 0; 335 break; 336 default: 337 ucs_fatal("unknown protocol"); 338 } 339} 340 341/** 342 * Advance state of send request after UCT operation. This function applies 343 * @a new_dt_state to @a req request according to @a proto protocol. Also, UCT 344 * completion counter will be incremented if @a proto requires it. 345 * 346 * @param [inout] req Send request. 347 * @param [in] new_dt_state State which was progressed by 348 * @ref ucp_dt_pack or @ref ucp_dt_iov_copy_uct. 349 * @param [in] proto Internal UCP protocol identifier 350 * (UCP_REQUEST_SEND_PROTO_*) 351 * @param [in] status Status of the last UCT operation which 352 * progressed @a proto protocol. 353 */ 354static UCS_F_ALWAYS_INLINE void 355ucp_request_send_state_advance(ucp_request_t *req, 356 const ucp_dt_state_t *new_dt_state, 357 unsigned proto, ucs_status_t status) 358{ 359 if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) { 360 /* Don't advance after failed operation in order to continue on next try 361 * from last valid point. 362 */ 363 return; 364 } 365 366 switch (proto) { 367 case UCP_REQUEST_SEND_PROTO_RMA: 368 if (status == UCS_INPROGRESS) { 369 ++req->send.state.uct_comp.count; 370 } 371 break; 372 case UCP_REQUEST_SEND_PROTO_ZCOPY_AM: 373 /* Fall through */ 374 case UCP_REQUEST_SEND_PROTO_RNDV_GET: 375 case UCP_REQUEST_SEND_PROTO_RNDV_PUT: 376 if (status == UCS_INPROGRESS) { 377 ++req->send.state.uct_comp.count; 378 } 379 /* Fall through */ 380 case UCP_REQUEST_SEND_PROTO_BCOPY_AM: 381 ucs_assert(new_dt_state != NULL); 382 if (UCP_DT_IS_CONTIG(req->send.datatype)) { 383 /* cppcheck-suppress nullPointer */ 384 req->send.state.dt.offset = new_dt_state->offset; 385 } else { 386 /* cppcheck-suppress nullPointer */ 387 req->send.state.dt = *new_dt_state; 388 } 389 break; 390 default: 391 ucs_fatal("unknown protocol"); 392 } 393 394 /* offset is not used for RMA */ 395 ucs_assert((proto == UCP_REQUEST_SEND_PROTO_RMA) || 396 (req->send.state.dt.offset <= req->send.length)); 397} 398 399static UCS_F_ALWAYS_INLINE ucs_status_t 400ucp_request_send_buffer_reg(ucp_request_t *req, ucp_md_map_t md_map, 401 unsigned uct_flags) 402{ 403 return ucp_request_memory_reg(req->send.ep->worker->context, md_map, 404 (void*)req->send.buffer, req->send.length, 405 req->send.datatype, &req->send.state.dt, 406 req->send.mem_type, req, uct_flags); 407} 408 409static UCS_F_ALWAYS_INLINE ucs_status_t 410ucp_request_send_buffer_reg_lane_check(ucp_request_t *req, ucp_lane_index_t lane, 411 ucp_md_map_t prev_md_map, unsigned uct_flags) 412{ 413 ucp_md_map_t md_map; 414 415 if (!(ucp_ep_md_attr(req->send.ep, 416 lane)->cap.flags & UCT_MD_FLAG_NEED_MEMH)) { 417 return UCS_OK; 418 } 419 420 ucs_assert(ucp_ep_md_attr(req->send.ep, 421 lane)->cap.flags & UCT_MD_FLAG_REG); 422 md_map = UCS_BIT(ucp_ep_md_index(req->send.ep, lane)) | prev_md_map; 423 return ucp_request_send_buffer_reg(req, md_map, uct_flags); 424} 425 426static UCS_F_ALWAYS_INLINE ucs_status_t 427ucp_request_send_buffer_reg_lane(ucp_request_t *req, ucp_lane_index_t lane, 428 unsigned uct_flags) 429{ 430 return ucp_request_send_buffer_reg_lane_check(req, lane, 0, uct_flags); 431} 432 433static UCS_F_ALWAYS_INLINE ucs_status_t 434ucp_send_request_add_reg_lane(ucp_request_t *req, ucp_lane_index_t lane) 435{ 436 /* Add new lane to registration map */ 437 ucp_md_map_t md_map; 438 439 if (ucs_likely(UCP_DT_IS_CONTIG(req->send.datatype))) { 440 md_map = req->send.state.dt.dt.contig.md_map; 441 } else if (UCP_DT_IS_IOV(req->send.datatype) && 442 (req->send.state.dt.dt.iov.dt_reg != NULL)) { 443 /* dt_reg can be NULL if underlying UCT TL doesn't require 444 * memory handle for for local AM/GET/PUT operations 445 * (i.e. UCT_MD_FLAG_NEED_MEMH is not set) */ 446 /* Can use the first DT registration element, since 447 * they have the same MD maps */ 448 md_map = req->send.state.dt.dt.iov.dt_reg[0].md_map; 449 } else { 450 md_map = 0; 451 } 452 453 ucs_assert(ucs_popcount(md_map) <= UCP_MAX_OP_MDS); 454 return ucp_request_send_buffer_reg_lane_check(req, lane, md_map, 0); 455} 456 457static UCS_F_ALWAYS_INLINE ucs_status_t 458ucp_request_recv_buffer_reg(ucp_request_t *req, ucp_md_map_t md_map, 459 size_t length) 460{ 461 return ucp_request_memory_reg(req->recv.worker->context, md_map, 462 req->recv.buffer, length, 463 req->recv.datatype, &req->recv.state, 464 req->recv.mem_type, req, 465 UCT_MD_MEM_FLAG_HIDE_ERRORS); 466} 467 468static UCS_F_ALWAYS_INLINE void ucp_request_send_buffer_dereg(ucp_request_t *req) 469{ 470 ucp_request_memory_dereg(req->send.ep->worker->context, req->send.datatype, 471 &req->send.state.dt, req); 472} 473 474static UCS_F_ALWAYS_INLINE void ucp_request_recv_buffer_dereg(ucp_request_t *req) 475{ 476 ucp_request_memory_dereg(req->recv.worker->context, req->recv.datatype, 477 &req->recv.state, req); 478} 479 480static UCS_F_ALWAYS_INLINE void 481ucp_request_wait_uct_comp(ucp_request_t *req) 482{ 483 while (req->send.state.uct_comp.count > 0) { 484 ucp_worker_progress(req->send.ep->worker); 485 } 486} 487 488static UCS_F_ALWAYS_INLINE void 489ucp_request_unpack_contig(ucp_request_t *req, void *buf, const void *data, 490 size_t length) 491{ 492 if (ucs_likely(UCP_MEM_IS_ACCESSIBLE_FROM_CPU(req->recv.mem_type))) { 493 UCS_PROFILE_NAMED_CALL("memcpy_recv", ucs_memcpy_relaxed, buf, 494 data, length); 495 } else { 496 ucp_mem_type_unpack(req->recv.worker, buf, data, length, 497 req->recv.mem_type); 498 } 499} 500 501/** 502 * Unpack receive data to a request 503 * 504 * req - receive request 505 * data - data to unpack 506 * length - 507 * offset - offset of received data within the request, for OOO fragments 508 * 509 * 510 */ 511static UCS_F_ALWAYS_INLINE ucs_status_t 512ucp_request_recv_data_unpack(ucp_request_t *req, const void *data, 513 size_t length, size_t offset, int last) 514{ 515 ucp_dt_generic_t *dt_gen; 516 ucs_status_t status; 517 518 ucs_assert(req->status == UCS_OK); 519 520 ucp_trace_req(req, "unpack recv_data req_len %zu data_len %zu offset %zu last: %s", 521 req->recv.length, length, offset, last ? "yes" : "no"); 522 523 if (ucs_unlikely((length + offset) > req->recv.length)) { 524 return ucp_request_recv_msg_truncated(req, length, offset); 525 } 526 527 switch (req->recv.datatype & UCP_DATATYPE_CLASS_MASK) { 528 case UCP_DATATYPE_CONTIG: 529 ucp_request_unpack_contig(req, 530 UCS_PTR_BYTE_OFFSET(req->recv.buffer, offset), 531 data, length); 532 return UCS_OK; 533 534 case UCP_DATATYPE_IOV: 535 if (offset != req->recv.state.offset) { 536 ucp_dt_iov_seek(req->recv.buffer, req->recv.state.dt.iov.iovcnt, 537 offset - req->recv.state.offset, 538 &req->recv.state.dt.iov.iov_offset, 539 &req->recv.state.dt.iov.iovcnt_offset); 540 req->recv.state.offset = offset; 541 } 542 UCS_PROFILE_CALL(ucp_dt_iov_scatter, req->recv.buffer, 543 req->recv.state.dt.iov.iovcnt, data, length, 544 &req->recv.state.dt.iov.iov_offset, 545 &req->recv.state.dt.iov.iovcnt_offset); 546 req->recv.state.offset += length; 547 return UCS_OK; 548 549 case UCP_DATATYPE_GENERIC: 550 dt_gen = ucp_dt_generic(req->recv.datatype); 551 status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, 552 req->recv.state.dt.generic.state, 553 offset, data, length); 554 if (last || (status != UCS_OK)) { 555 UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, 556 req->recv.state.dt.generic.state); 557 } 558 return status; 559 560 default: 561 ucs_fatal("unexpected datatype=%lx", req->recv.datatype); 562 } 563} 564 565static UCS_F_ALWAYS_INLINE ucs_status_t 566ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length, 567 int data_offset, unsigned am_flags, uint16_t hdr_len, 568 uint16_t rdesc_flags, int priv_length, 569 ucp_recv_desc_t **rdesc_p) 570{ 571 ucp_recv_desc_t *rdesc; 572 void *data_hdr; 573 ucs_status_t status; 574 575 if (ucs_unlikely(am_flags & UCT_CB_PARAM_FLAG_DESC)) { 576 /* slowpath */ 577 ucs_assert(priv_length <= UCP_WORKER_HEADROOM_PRIV_SIZE); 578 data_hdr = UCS_PTR_BYTE_OFFSET(data, -data_offset); 579 rdesc = (ucp_recv_desc_t *)data_hdr - 1; 580 rdesc->flags = rdesc_flags | UCP_RECV_DESC_FLAG_UCT_DESC; 581 rdesc->uct_desc_offset = UCP_WORKER_HEADROOM_PRIV_SIZE - priv_length; 582 status = UCS_INPROGRESS; 583 } else { 584 rdesc = (ucp_recv_desc_t*)ucs_mpool_get_inline(&worker->am_mp); 585 if (rdesc == NULL) { 586 ucs_error("ucp recv descriptor is not allocated"); 587 return UCS_ERR_NO_MEMORY; 588 } 589 590 /* No need to initialize rdesc->priv_length here, because it is only 591 * needed for releasing UCT descriptor. */ 592 593 rdesc->flags = rdesc_flags; 594 status = UCS_OK; 595 memcpy(UCS_PTR_BYTE_OFFSET(rdesc + 1, data_offset), data, length); 596 } 597 598 rdesc->length = length + data_offset; 599 rdesc->payload_offset = hdr_len; 600 *rdesc_p = rdesc; 601 return status; 602} 603 604static UCS_F_ALWAYS_INLINE void 605ucp_recv_desc_release(ucp_recv_desc_t *rdesc) 606{ 607 void *uct_desc; 608 609 ucs_trace_req("release receive descriptor %p", rdesc); 610 if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_UCT_DESC)) { 611 /* uct desc is slowpath */ 612 uct_desc = UCS_PTR_BYTE_OFFSET(rdesc, -rdesc->uct_desc_offset); 613 uct_iface_release_desc(uct_desc); 614 } else { 615 ucs_mpool_put_inline(rdesc); 616 } 617} 618 619static UCS_F_ALWAYS_INLINE ucp_lane_index_t 620ucp_send_request_get_am_bw_lane(ucp_request_t *req) 621{ 622 ucp_lane_index_t lane; 623 624 lane = ucp_ep_config(req->send.ep)-> 625 key.am_bw_lanes[req->send.msg_proto.am_bw_index]; 626 ucs_assertv(lane != UCP_NULL_LANE, "req->send.msg_proto.am_bw_index=%d", 627 req->send.msg_proto.am_bw_index); 628 return lane; 629} 630 631static UCS_F_ALWAYS_INLINE void 632ucp_send_request_next_am_bw_lane(ucp_request_t *req) 633{ 634 ucp_lane_index_t am_bw_index = ++req->send.msg_proto.am_bw_index; 635 ucp_ep_config_t *config = ucp_ep_config(req->send.ep); 636 637 if ((am_bw_index >= UCP_MAX_LANES) || 638 (config->key.am_bw_lanes[am_bw_index] == UCP_NULL_LANE)) { 639 req->send.msg_proto.am_bw_index = 0; 640 } 641} 642 643static UCS_F_ALWAYS_INLINE uintptr_t ucp_request_get_dest_ep_ptr(ucp_request_t *req) 644{ 645 /* This function may return 0, but in such cases the message should not be 646 * sent at all because the am_lane would point to a wireup (proxy) endpoint. 647 * So only the receiver side has an assertion that ep_ptr != 0. 648 */ 649 return ucp_ep_dest_ep_ptr(req->send.ep); 650} 651 652static UCS_F_ALWAYS_INLINE uint32_t 653ucp_request_param_flags(const ucp_request_param_t *param) 654{ 655 return (param->op_attr_mask & UCP_OP_ATTR_FIELD_FLAGS) ? 656 param->flags : 0; 657} 658 659static UCS_F_ALWAYS_INLINE ucp_datatype_t 660ucp_request_param_datatype(const ucp_request_param_t *param) 661{ 662 return (param->op_attr_mask & UCP_OP_ATTR_FIELD_DATATYPE) ? 663 param->datatype : ucp_dt_make_contig(1); 664} 665 666#endif 667