1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #ifdef HAVE_CONFIG_H
8 # include "config.h"
9 #endif
10
11 #include <ucp/core/ucp_ep.h>
12 #include <ucp/core/ucp_worker.h>
13 #include <ucp/core/ucp_context.h>
14 #include <ucp/core/ucp_request.h>
15 #include <ucp/core/ucp_request.inl>
16 #include <ucp/stream/stream.h>
17
18 #include <ucs/datastruct/mpool.inl>
19 #include <ucs/profile/profile.h>
20
21
22 /* @verbatim
23 * Data layout within Stream AM
24 * |---------------------------------------------------------------------------------------------------------------------------|
25 * | ucp_recv_desc_t | \ / | ucp_stream_am_data_t | payload |
26 * |-----------------------------------------------------------------| \ / |----------------------|-------------------------|
27 * | stream_queue | length | payload_offset | flags | \/ | am_header | |
28 * | tag_list (not used) | | | | /\ | rdesc | |
29 * |---------------------|----------------|----------------|---------| / \ |----------------------|-------------------------|
30 * | 4 * sizeof(ptr) | 32 bits | 32 bits | 16 bits | / \ | 64 bits | up to TL AM buffer size |
31 * |---------------------------------------------------------------------------------------------------------------------------|
32 * @endverbatim
33 *
34 * stream_queue is an entry link in the "unexpected" queue per endpoint
35 * length is an actual size of 'payload'
36 * payload_offset is a distance between 'ucp_recv_desc_t *' and 'payload *'
37 * X is an optional empty space which is a result of partial
38 * handled payload in case when 'length' greater than user's
39 * buffer size passed to @ref ucp_stream_recv_nb
40 * am_header is an active message header, not actual after ucp_recv_desc_t
41 * initialization and setup of offsets
42 * rdesc pointer to 'ucp_recv_desc_t *', it's needed to get access to
43 * 'ucp_recv_desc_t *' inside @ref ucp_stream_release_data after
44 * the buffer was returned to user by
45 * @ref ucp_stream_recv_data_nb as a pointer to 'payload'
46 */
47
48
49 #define ucp_stream_rdesc_payload(_rdesc) \
50 (UCS_PTR_BYTE_OFFSET((_rdesc), (_rdesc)->payload_offset))
51
52
53 #define ucp_stream_rdesc_am_data(_rdesc) \
54 ((ucp_stream_am_data_t *) \
55 UCS_PTR_BYTE_OFFSET(ucp_stream_rdesc_payload(_rdesc), \
56 -sizeof(ucp_stream_am_data_t)))
57
58
59 #define ucp_stream_rdesc_from_data(_data) \
60 ((ucp_stream_am_data_t *)_data - 1)->rdesc
61
62
63 static UCS_F_ALWAYS_INLINE ucp_recv_desc_t *
ucp_stream_rdesc_dequeue(ucp_ep_ext_proto_t * ep_ext)64 ucp_stream_rdesc_dequeue(ucp_ep_ext_proto_t *ep_ext)
65 {
66 ucp_recv_desc_t *rdesc = ucs_queue_pull_elem_non_empty(&ep_ext->stream.match_q,
67 ucp_recv_desc_t,
68 stream_queue);
69 ucs_assert(ucp_stream_ep_has_data(ep_ext));
70 if (ucs_unlikely(ucs_queue_is_empty(&ep_ext->stream.match_q))) {
71 ucp_ep_from_ext_proto(ep_ext)->flags &= ~UCP_EP_FLAG_STREAM_HAS_DATA;
72 if (ucp_stream_ep_is_queued(ep_ext)) {
73 ucp_stream_ep_dequeue(ep_ext);
74 }
75 }
76
77 return rdesc;
78 }
79
80 static UCS_F_ALWAYS_INLINE ucp_recv_desc_t *
ucp_stream_rdesc_get(ucp_ep_ext_proto_t * ep_ext)81 ucp_stream_rdesc_get(ucp_ep_ext_proto_t *ep_ext)
82 {
83 ucp_recv_desc_t *rdesc = ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
84 ucp_recv_desc_t,
85 stream_queue);
86
87 ucs_assert(ucp_stream_ep_has_data(ep_ext));
88 ucs_trace_data("ep %p, rdesc %p with %u stream bytes",
89 ucp_ep_from_ext_proto(ep_ext), rdesc, rdesc->length);
90
91 return rdesc;
92 }
93
94 static UCS_F_ALWAYS_INLINE ucs_status_ptr_t
ucp_stream_recv_data_nb_nolock(ucp_ep_h ep,size_t * length)95 ucp_stream_recv_data_nb_nolock(ucp_ep_h ep, size_t *length)
96 {
97 ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep);
98 ucp_recv_desc_t *rdesc;
99 ucp_stream_am_data_t *am_data;
100
101 if (ucs_unlikely(!ucp_stream_ep_has_data(ep_ext))) {
102 return UCS_STATUS_PTR(UCS_OK);
103 }
104
105 rdesc = ucp_stream_rdesc_dequeue(ep_ext);
106
107 *length = rdesc->length;
108 am_data = ucp_stream_rdesc_am_data(rdesc);
109 am_data->rdesc = rdesc;
110 return am_data + 1;
111 }
112
113 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_data_nb, (ep, length),
114 ucp_ep_h ep, size_t *length)
115 {
116 ucs_status_ptr_t status_ptr;
117
118 UCP_CONTEXT_CHECK_FEATURE_FLAGS(ep->worker->context, UCP_FEATURE_STREAM,
119 return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
120
121 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
122 status_ptr = ucp_stream_recv_data_nb_nolock(ep, length);
123 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
124
125 return status_ptr;
126 }
127
128 static UCS_F_ALWAYS_INLINE void
ucp_stream_rdesc_dequeue_and_release(ucp_recv_desc_t * rdesc,ucp_ep_ext_proto_t * ep_ext)129 ucp_stream_rdesc_dequeue_and_release(ucp_recv_desc_t *rdesc,
130 ucp_ep_ext_proto_t *ep_ext)
131 {
132 ucs_assert(ucp_stream_ep_has_data(ep_ext));
133 ucs_assert(rdesc == ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
134 ucp_recv_desc_t,
135 stream_queue));
136 ucp_stream_rdesc_dequeue(ep_ext);
137 ucp_recv_desc_release(rdesc);
138 }
139
140 UCS_PROFILE_FUNC_VOID(ucp_stream_data_release, (ep, data),
141 ucp_ep_h ep, void *data)
142 {
143 ucp_recv_desc_t *rdesc = ucp_stream_rdesc_from_data(data);
144
145 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
146
147 ucp_recv_desc_release(rdesc);
148
149 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
150 }
151
152 static UCS_F_ALWAYS_INLINE ssize_t
ucp_stream_rdata_unpack(const void * rdata,size_t length,ucp_request_t * dst_req)153 ucp_stream_rdata_unpack(const void *rdata, size_t length, ucp_request_t *dst_req)
154 {
155 size_t valid_len;
156 int last;
157 ucs_status_t status;
158
159 /* Truncated error is not actual for stream, need to adjust */
160 valid_len = dst_req->recv.length - dst_req->recv.stream.offset;
161 if (valid_len <= length) {
162 last = (valid_len == length);
163 } else {
164 valid_len = length;
165 last = !(dst_req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL);
166 }
167
168 status = ucp_request_recv_data_unpack(dst_req, rdata, valid_len,
169 dst_req->recv.stream.offset, last);
170 if (ucs_likely(status == UCS_OK)) {
171 dst_req->recv.stream.offset += valid_len;
172 ucs_trace_data("unpacked %zd bytes of stream data %p",
173 valid_len, rdata);
174 return valid_len;
175 }
176
177 ucs_assert(status != UCS_ERR_MESSAGE_TRUNCATED);
178 return status;
179 }
180
181 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_rdesc_advance(ucp_recv_desc_t * rdesc,ssize_t offset,ucp_ep_ext_proto_t * ep_ext)182 ucp_stream_rdesc_advance(ucp_recv_desc_t *rdesc, ssize_t offset,
183 ucp_ep_ext_proto_t *ep_ext)
184 {
185 ucs_assert(offset <= rdesc->length);
186
187 if (ucs_unlikely(offset < 0)) {
188 return (ucs_status_t)offset;
189 } else if (ucs_likely(offset == rdesc->length)) {
190 ucp_stream_rdesc_dequeue_and_release(rdesc, ep_ext);
191 } else {
192 rdesc->length -= offset;
193 rdesc->payload_offset += offset;
194 }
195
196 return UCS_OK;
197 }
198
199 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_process_rdesc_inplace(ucp_recv_desc_t * rdesc,ucp_datatype_t dt,void * buffer,size_t count,size_t length,ucp_ep_ext_proto_t * ep_ext)200 ucp_stream_process_rdesc_inplace(ucp_recv_desc_t *rdesc, ucp_datatype_t dt,
201 void *buffer, size_t count, size_t length,
202 ucp_ep_ext_proto_t *ep_ext)
203 {
204 ucp_worker_h worker = ucp_ep_from_ext_proto(ep_ext)->worker;
205 ucs_status_t status;
206 ssize_t unpacked;
207 ucs_memory_type_t mem_type;
208
209 mem_type = ucp_memory_type_detect(worker->context, buffer, length);
210 status = ucp_dt_unpack_only(worker, buffer, count, dt, mem_type,
211 ucp_stream_rdesc_payload(rdesc), length, 0);
212
213 unpacked = ucs_likely(status == UCS_OK) ? length : status;
214
215 return ucp_stream_rdesc_advance(rdesc, unpacked, ep_ext);
216 }
217
218 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_process_rdesc(ucp_recv_desc_t * rdesc,ucp_ep_ext_proto_t * ep_ext,ucp_request_t * req)219 ucp_stream_process_rdesc(ucp_recv_desc_t *rdesc, ucp_ep_ext_proto_t *ep_ext,
220 ucp_request_t *req)
221 {
222 ssize_t unpacked;
223
224 unpacked = ucp_stream_rdata_unpack(ucp_stream_rdesc_payload(rdesc),
225 rdesc->length, req);
226 ucs_assert(req->recv.stream.offset <= req->recv.length);
227
228 return ucp_stream_rdesc_advance(rdesc, unpacked, ep_ext);
229 }
230
231 static UCS_F_ALWAYS_INLINE void
ucp_stream_recv_request_init(ucp_request_t * req,ucp_ep_h ep,void * buffer,size_t count,size_t length,ucp_datatype_t datatype,const ucp_request_param_t * param)232 ucp_stream_recv_request_init(ucp_request_t *req, ucp_ep_h ep, void *buffer,
233 size_t count, size_t length,
234 ucp_datatype_t datatype,
235 const ucp_request_param_t *param)
236 {
237 uint32_t flags = ucp_request_param_flags(param);
238
239 req->flags = UCP_REQUEST_FLAG_STREAM_RECV |
240 ((flags & UCP_STREAM_RECV_FLAG_WAITALL) ?
241 UCP_REQUEST_FLAG_STREAM_RECV_WAITALL : 0);
242 #if UCS_ENABLE_ASSERT
243 req->status = UCS_OK; /* for ucp_request_recv_data_unpack() */
244 #endif
245 req->recv.stream.length = 0;
246 req->recv.stream.offset = 0;
247
248 ucp_dt_recv_state_init(&req->recv.state, buffer, datatype, count);
249
250 req->recv.worker = ep->worker;
251 req->recv.buffer = buffer;
252 req->recv.datatype = datatype;
253 req->recv.length = ucs_likely(!UCP_DT_IS_GENERIC(datatype)) ? length :
254 ucp_dt_length(datatype, count, NULL, &req->recv.state);
255 req->recv.mem_type = ucp_memory_type_detect(ep->worker->context,
256 (void*)buffer, req->recv.length);
257
258 if (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) {
259 req->flags |= UCP_REQUEST_FLAG_CALLBACK;
260 req->recv.stream.cb = param->cb.recv_stream;
261 req->user_data = (param->op_attr_mask & UCP_OP_ATTR_FIELD_USER_DATA) ?
262 param->user_data : NULL;
263 }
264 }
265
266 static UCS_F_ALWAYS_INLINE int
ucp_stream_recv_nb_is_inplace(ucp_ep_ext_proto_t * ep_ext,size_t dt_length)267 ucp_stream_recv_nb_is_inplace(ucp_ep_ext_proto_t *ep_ext, size_t dt_length)
268 {
269 return ucp_stream_ep_has_data(ep_ext) &&
270 (ucp_stream_rdesc_get(ep_ext)->length >= dt_length);
271 }
272
273 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb,
274 (ep, buffer, count, datatype, cb, length, flags),
275 ucp_ep_h ep, void *buffer, size_t count,
276 ucp_datatype_t datatype, ucp_stream_recv_callback_t cb,
277 size_t *length, unsigned flags)
278 {
279 ucp_request_param_t param = {
280 .op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
281 UCP_OP_ATTR_FIELD_CALLBACK |
282 UCP_OP_ATTR_FIELD_FLAGS,
283 .cb.recv_stream = (ucp_stream_recv_nbx_callback_t)cb,
284 .flags = flags,
285 .datatype = datatype
286 };
287
288 return ucp_stream_recv_nbx(ep, buffer, count, length, ¶m);
289 }
290
291 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nbx,
292 (ep, buffer, count, length, param),
293 ucp_ep_h ep, void *buffer, size_t count, size_t *length,
294 const ucp_request_param_t *param)
295 {
296 ucs_status_t status = UCS_OK;
297 ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep);
298 ucp_datatype_t datatype;
299 size_t dt_length;
300 ucp_request_t *req;
301 ucp_recv_desc_t *rdesc;
302 uint32_t attr_mask;
303
304 UCP_CONTEXT_CHECK_FEATURE_FLAGS(ep->worker->context, UCP_FEATURE_STREAM,
305 return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
306 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
307
308 attr_mask = param->op_attr_mask &
309 (UCP_OP_ATTR_FIELD_DATATYPE | UCP_OP_ATTR_FLAG_NO_IMM_CMPL);
310 if (ucs_likely(attr_mask == 0)) {
311 datatype = ucp_dt_make_contig(1);
312 dt_length = count; /* use dt_lendth to suppress coverity false positive */
313 if (ucs_likely(ucp_stream_recv_nb_is_inplace(ep_ext, count))) {
314 status = ucp_stream_process_rdesc_inplace(ucp_stream_rdesc_get(ep_ext),
315 datatype, buffer, count,
316 dt_length, ep_ext);
317 *length = count;
318 goto out_status;
319 }
320 } else if (attr_mask == UCP_OP_ATTR_FIELD_DATATYPE) {
321 datatype = param->datatype;
322 if (!UCP_DT_IS_GENERIC(datatype)) {
323 dt_length = ucp_dt_length(datatype, count, buffer, NULL);
324 if (ucp_stream_recv_nb_is_inplace(ep_ext, dt_length)) {
325 status = ucp_stream_process_rdesc_inplace(ucp_stream_rdesc_get(ep_ext),
326 datatype, buffer, count,
327 dt_length, ep_ext);
328 *length = dt_length;
329 goto out_status;
330 }
331 } else {
332 dt_length = 0;
333 }
334 } else {
335 datatype = ucp_dt_make_contig(1);
336 dt_length = count;
337 }
338
339 if (ucs_unlikely(param->op_attr_mask & UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL)) {
340 status = UCS_ERR_NO_RESOURCE;
341 goto out_status;
342 }
343
344 req = ucp_request_get_param(ep->worker, param,
345 {
346 status = UCS_ERR_NO_MEMORY;
347 goto out_status;
348 });
349
350 ucp_stream_recv_request_init(req, ep, buffer, count, dt_length, datatype,
351 param);
352
353 /* OK, lets obtain all arrived data which matches the recv size */
354 while ((req->recv.stream.offset < req->recv.length) &&
355 ucp_stream_ep_has_data(ep_ext)) {
356
357 rdesc = ucp_stream_rdesc_get(ep_ext);
358 status = ucp_stream_process_rdesc(rdesc, ep_ext, req);
359 if (ucs_unlikely(status != UCS_OK)) {
360 goto out_put_request;
361 }
362
363 /*
364 * NOTE: generic datatype can be completed with any amount of data to
365 * avoid extra logic in ucp_stream_process_rdesc, exception is
366 * WAITALL flag
367 */
368 if (ucs_unlikely(UCP_DT_IS_GENERIC(req->recv.datatype)) &&
369 !(req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL)) {
370 break;
371 }
372 }
373
374 ucs_assert(req->recv.stream.offset <= req->recv.length);
375
376 if (ucp_request_can_complete_stream_recv(req)) {
377 *length = req->recv.stream.offset;
378 } else {
379 ucs_assert(!ucp_stream_ep_has_data(ep_ext));
380 ucs_queue_push(&ep_ext->stream.match_q, &req->recv.queue);
381 req += 1;
382 goto out;
383 }
384
385 out_put_request:
386 ucp_request_put_param(param, req);
387
388 out_status:
389 req = UCS_STATUS_PTR(status);
390
391 out:
392 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
393 return req;
394 }
395
396 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_am_data_process(ucp_worker_t * worker,ucp_ep_ext_proto_t * ep_ext,ucp_stream_am_data_t * am_data,size_t length,unsigned am_flags)397 ucp_stream_am_data_process(ucp_worker_t *worker, ucp_ep_ext_proto_t *ep_ext,
398 ucp_stream_am_data_t *am_data, size_t length,
399 unsigned am_flags)
400 {
401 ucp_recv_desc_t rdesc_tmp;
402 void *payload;
403 ucp_recv_desc_t *rdesc;
404 ucp_request_t *req;
405 ssize_t unpacked;
406
407 rdesc_tmp.length = length;
408 rdesc_tmp.payload_offset = sizeof(*am_data); /* add sizeof(*rdesc) only if
409 am_data wont be handled in
410 place */
411
412 /* First, process expected requests */
413 if (!ucp_stream_ep_has_data(ep_ext)) {
414 while (!ucs_queue_is_empty(&ep_ext->stream.match_q)) {
415 req = ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
416 ucp_request_t, recv.queue);
417 payload = UCS_PTR_BYTE_OFFSET(am_data, rdesc_tmp.payload_offset);
418 unpacked = ucp_stream_rdata_unpack(payload, rdesc_tmp.length, req);
419 if (ucs_unlikely(unpacked < 0)) {
420 ucs_fatal("failed to unpack from am_data %p with offset %u to request %p",
421 am_data, rdesc_tmp.payload_offset, req);
422 } else if (unpacked == rdesc_tmp.length) {
423 if (ucp_request_can_complete_stream_recv(req)) {
424 ucp_request_complete_stream_recv(req, ep_ext, UCS_OK);
425 }
426 return UCS_OK;
427 }
428 ucp_stream_rdesc_advance(&rdesc_tmp, unpacked, ep_ext);
429 /* This request is full, try next one */
430 ucs_assert(ucp_request_can_complete_stream_recv(req));
431 ucp_request_complete_stream_recv(req, ep_ext, UCS_OK);
432 }
433 }
434
435 ucs_assert(rdesc_tmp.length > 0);
436
437 /* Now, enqueue the rest of data */
438 if (ucs_likely(!(am_flags & UCT_CB_PARAM_FLAG_DESC))) {
439 rdesc = (ucp_recv_desc_t*)ucs_mpool_get_inline(&worker->am_mp);
440 ucs_assertv_always(rdesc != NULL,
441 "ucp recv descriptor is not allocated");
442 rdesc->length = rdesc_tmp.length;
443 /* reset offset to improve locality */
444 rdesc->payload_offset = sizeof(*rdesc) + sizeof(*am_data);
445 rdesc->flags = 0;
446 memcpy(ucp_stream_rdesc_payload(rdesc),
447 UCS_PTR_BYTE_OFFSET(am_data, rdesc_tmp.payload_offset),
448 rdesc_tmp.length);
449 } else {
450 /* slowpath */
451 rdesc = (ucp_recv_desc_t *)am_data - 1;
452 rdesc->length = rdesc_tmp.length;
453 rdesc->payload_offset = rdesc_tmp.payload_offset + sizeof(*rdesc);
454 rdesc->uct_desc_offset = UCP_WORKER_HEADROOM_PRIV_SIZE;
455 rdesc->flags = UCP_RECV_DESC_FLAG_UCT_DESC;
456 }
457
458 ucp_ep_from_ext_proto(ep_ext)->flags |= UCP_EP_FLAG_STREAM_HAS_DATA;
459 ucs_queue_push(&ep_ext->stream.match_q, &rdesc->stream_queue);
460
461 return UCS_INPROGRESS;
462 }
463
ucp_stream_ep_init(ucp_ep_h ep)464 void ucp_stream_ep_init(ucp_ep_h ep)
465 {
466 ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep);
467
468 if (ep->worker->context->config.features & UCP_FEATURE_STREAM) {
469 ep_ext->stream.ready_list.prev = NULL;
470 ep_ext->stream.ready_list.next = NULL;
471 ucs_queue_head_init(&ep_ext->stream.match_q);
472 }
473 }
474
ucp_stream_ep_cleanup(ucp_ep_h ep)475 void ucp_stream_ep_cleanup(ucp_ep_h ep)
476 {
477 ucp_ep_ext_proto_t* ep_ext;
478 ucp_request_t *req;
479 size_t length;
480 void *data;
481
482 if (!(ep->worker->context->config.features & UCP_FEATURE_STREAM)) {
483 return;
484 }
485
486 /* drop unmatched data */
487 while ((data = ucp_stream_recv_data_nb_nolock(ep, &length)) != NULL) {
488 ucs_assert_always(!UCS_PTR_IS_ERR(data));
489 ucp_stream_data_release(ep, data);
490 }
491
492 ep_ext = ucp_ep_ext_proto(ep);
493
494 if (ucp_stream_ep_is_queued(ep_ext)) {
495 ucp_stream_ep_dequeue(ep_ext);
496 }
497
498 /* cancel not completed requests */
499 ucs_assert(!ucp_stream_ep_has_data(ep_ext));
500 while (!ucs_queue_is_empty(&ep_ext->stream.match_q)) {
501 req = ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
502 ucp_request_t, recv.queue);
503 ucp_request_complete_stream_recv(req, ep_ext, UCS_ERR_CANCELED);
504 }
505 }
506
ucp_stream_ep_activate(ucp_ep_h ep)507 void ucp_stream_ep_activate(ucp_ep_h ep)
508 {
509 ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep);
510
511 if ((ep->worker->context->config.features & UCP_FEATURE_STREAM) &&
512 ucp_stream_ep_has_data(ep_ext) && !ucp_stream_ep_is_queued(ep_ext)) {
513 ucp_stream_ep_enqueue(ep_ext, ep->worker);
514 }
515 }
516
517 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_am_handler(void * am_arg,void * am_data,size_t am_length,unsigned am_flags)518 ucp_stream_am_handler(void *am_arg, void *am_data, size_t am_length,
519 unsigned am_flags)
520 {
521 ucp_worker_h worker = am_arg;
522 ucp_stream_am_data_t *data = am_data;
523 ucp_ep_h ep;
524 ucp_ep_ext_proto_t *ep_ext;
525 ucs_status_t status;
526
527 ucs_assert(am_length >= sizeof(ucp_stream_am_hdr_t));
528
529 ep = ucp_worker_get_ep_by_ptr(worker, data->hdr.ep_ptr);
530 ep_ext = ucp_ep_ext_proto(ep);
531
532 if (ucs_unlikely(ep->flags & UCP_EP_FLAG_CLOSED)) {
533 ucs_trace_data("ep %p: stream is invalid", ep);
534 /* drop the data */
535 return UCS_OK;
536 }
537
538 status = ucp_stream_am_data_process(worker, ep_ext, data,
539 am_length - sizeof(data->hdr),
540 am_flags);
541 if (status == UCS_OK) {
542 /* rdesc was processed in place */
543 return UCS_OK;
544 }
545
546 ucs_assert(status == UCS_INPROGRESS);
547
548 if (!ucp_stream_ep_is_queued(ep_ext) && (ep->flags & UCP_EP_FLAG_USED)) {
549 ucp_stream_ep_enqueue(ep_ext, worker);
550 }
551
552 return (am_flags & UCT_CB_PARAM_FLAG_DESC) ? UCS_INPROGRESS : UCS_OK;
553 }
554
ucp_stream_am_dump(ucp_worker_h worker,uct_am_trace_type_t type,uint8_t id,const void * data,size_t length,char * buffer,size_t max)555 static void ucp_stream_am_dump(ucp_worker_h worker, uct_am_trace_type_t type,
556 uint8_t id, const void *data, size_t length,
557 char *buffer, size_t max)
558 {
559 const ucp_stream_am_hdr_t *hdr = data;
560 size_t hdr_len = sizeof(*hdr);
561 char *p;
562
563 snprintf(buffer, max, "STREAM ep_ptr 0x%lx", hdr->ep_ptr);
564 p = buffer + strlen(buffer);
565
566 ucs_assert(hdr->ep_ptr != 0);
567 ucp_dump_payload(worker->context, p, buffer + max - p,
568 UCS_PTR_BYTE_OFFSET(data, hdr_len), length - hdr_len);
569 }
570
571 UCP_DEFINE_AM(UCP_FEATURE_STREAM, UCP_AM_ID_STREAM_DATA, ucp_stream_am_handler,
572 ucp_stream_am_dump, 0);
573
574 UCP_DEFINE_AM_PROXY(UCP_AM_ID_STREAM_DATA);
575