1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2017.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include <ucp/core/ucp_ep.h>
12 #include <ucp/core/ucp_worker.h>
13 #include <ucp/core/ucp_context.h>
14 #include <ucp/core/ucp_request.h>
15 #include <ucp/core/ucp_request.inl>
16 #include <ucp/stream/stream.h>
17 
18 #include <ucs/datastruct/mpool.inl>
19 #include <ucs/profile/profile.h>
20 
21 
22 /* @verbatim
23  * Data layout within Stream AM
24  * |---------------------------------------------------------------------------------------------------------------------------|
25  * | ucp_recv_desc_t                                                 | \    / | ucp_stream_am_data_t | payload                 |
26  * |-----------------------------------------------------------------|  \  /  |----------------------|-------------------------|
27  * | stream_queue        | length         | payload_offset | flags   |   \/   | am_header            |                         |
28  * | tag_list (not used) |                |                |         |   /\   | rdesc                |                         |
29  * |---------------------|----------------|----------------|---------|  /  \  |----------------------|-------------------------|
30  * | 4 * sizeof(ptr)     | 32 bits        | 32 bits        | 16 bits | /    \ | 64 bits              | up to TL AM buffer size |
31  * |---------------------------------------------------------------------------------------------------------------------------|
32  * @endverbatim
33  *
34  * stream_queue   is an entry link in the "unexpected" queue per endpoint
35  * length         is an actual size of 'payload'
36  * payload_offset is a distance between 'ucp_recv_desc_t *' and 'payload *'
37  * X              is an optional empty space which is a result of partial
38  *                handled payload in case when 'length' greater than user's
39  *                buffer size passed to @ref ucp_stream_recv_nb
40  * am_header      is an active message header, not actual after ucp_recv_desc_t
41  *                initialization and setup of offsets
42  * rdesc          pointer to 'ucp_recv_desc_t *', it's needed to get access to
43  *                'ucp_recv_desc_t *' inside @ref ucp_stream_release_data after
44  *                the buffer was returned to user by
45  *                @ref ucp_stream_recv_data_nb as a pointer to 'payload'
46  */
47 
48 
49 #define ucp_stream_rdesc_payload(_rdesc)                                      \
50     (UCS_PTR_BYTE_OFFSET((_rdesc), (_rdesc)->payload_offset))
51 
52 
53 #define ucp_stream_rdesc_am_data(_rdesc)                                      \
54     ((ucp_stream_am_data_t *)                                                 \
55      UCS_PTR_BYTE_OFFSET(ucp_stream_rdesc_payload(_rdesc),                    \
56                          -sizeof(ucp_stream_am_data_t)))
57 
58 
59 #define ucp_stream_rdesc_from_data(_data)                                     \
60     ((ucp_stream_am_data_t *)_data - 1)->rdesc
61 
62 
63 static UCS_F_ALWAYS_INLINE ucp_recv_desc_t *
ucp_stream_rdesc_dequeue(ucp_ep_ext_proto_t * ep_ext)64 ucp_stream_rdesc_dequeue(ucp_ep_ext_proto_t *ep_ext)
65 {
66     ucp_recv_desc_t *rdesc = ucs_queue_pull_elem_non_empty(&ep_ext->stream.match_q,
67                                                            ucp_recv_desc_t,
68                                                            stream_queue);
69     ucs_assert(ucp_stream_ep_has_data(ep_ext));
70     if (ucs_unlikely(ucs_queue_is_empty(&ep_ext->stream.match_q))) {
71         ucp_ep_from_ext_proto(ep_ext)->flags &= ~UCP_EP_FLAG_STREAM_HAS_DATA;
72         if (ucp_stream_ep_is_queued(ep_ext)) {
73             ucp_stream_ep_dequeue(ep_ext);
74         }
75     }
76 
77     return rdesc;
78 }
79 
80 static UCS_F_ALWAYS_INLINE ucp_recv_desc_t *
ucp_stream_rdesc_get(ucp_ep_ext_proto_t * ep_ext)81 ucp_stream_rdesc_get(ucp_ep_ext_proto_t *ep_ext)
82 {
83     ucp_recv_desc_t *rdesc = ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
84                                                            ucp_recv_desc_t,
85                                                            stream_queue);
86 
87     ucs_assert(ucp_stream_ep_has_data(ep_ext));
88     ucs_trace_data("ep %p, rdesc %p with %u stream bytes",
89                    ucp_ep_from_ext_proto(ep_ext), rdesc, rdesc->length);
90 
91     return rdesc;
92 }
93 
94 static UCS_F_ALWAYS_INLINE ucs_status_ptr_t
ucp_stream_recv_data_nb_nolock(ucp_ep_h ep,size_t * length)95 ucp_stream_recv_data_nb_nolock(ucp_ep_h ep, size_t *length)
96 {
97     ucp_ep_ext_proto_t   *ep_ext = ucp_ep_ext_proto(ep);
98     ucp_recv_desc_t      *rdesc;
99     ucp_stream_am_data_t *am_data;
100 
101     if (ucs_unlikely(!ucp_stream_ep_has_data(ep_ext))) {
102         return UCS_STATUS_PTR(UCS_OK);
103     }
104 
105     rdesc = ucp_stream_rdesc_dequeue(ep_ext);
106 
107     *length         = rdesc->length;
108     am_data         = ucp_stream_rdesc_am_data(rdesc);
109     am_data->rdesc  = rdesc;
110     return am_data + 1;
111 }
112 
113 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_data_nb, (ep, length),
114                  ucp_ep_h ep, size_t *length)
115 {
116     ucs_status_ptr_t status_ptr;
117 
118     UCP_CONTEXT_CHECK_FEATURE_FLAGS(ep->worker->context, UCP_FEATURE_STREAM,
119                                     return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
120 
121     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
122     status_ptr = ucp_stream_recv_data_nb_nolock(ep, length);
123     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
124 
125     return status_ptr;
126 }
127 
128 static UCS_F_ALWAYS_INLINE void
ucp_stream_rdesc_dequeue_and_release(ucp_recv_desc_t * rdesc,ucp_ep_ext_proto_t * ep_ext)129 ucp_stream_rdesc_dequeue_and_release(ucp_recv_desc_t *rdesc,
130                                      ucp_ep_ext_proto_t *ep_ext)
131 {
132     ucs_assert(ucp_stream_ep_has_data(ep_ext));
133     ucs_assert(rdesc == ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
134                                                       ucp_recv_desc_t,
135                                                       stream_queue));
136     ucp_stream_rdesc_dequeue(ep_ext);
137     ucp_recv_desc_release(rdesc);
138 }
139 
140 UCS_PROFILE_FUNC_VOID(ucp_stream_data_release, (ep, data),
141                       ucp_ep_h ep, void *data)
142 {
143     ucp_recv_desc_t *rdesc = ucp_stream_rdesc_from_data(data);
144 
145     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
146 
147     ucp_recv_desc_release(rdesc);
148 
149     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
150 }
151 
152 static UCS_F_ALWAYS_INLINE ssize_t
ucp_stream_rdata_unpack(const void * rdata,size_t length,ucp_request_t * dst_req)153 ucp_stream_rdata_unpack(const void *rdata, size_t length, ucp_request_t *dst_req)
154 {
155     size_t       valid_len;
156     int          last;
157     ucs_status_t status;
158 
159     /* Truncated error is not actual for stream, need to adjust */
160     valid_len = dst_req->recv.length - dst_req->recv.stream.offset;
161     if (valid_len <= length) {
162         last = (valid_len == length);
163     } else {
164         valid_len = length;
165         last      = !(dst_req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL);
166     }
167 
168     status = ucp_request_recv_data_unpack(dst_req, rdata, valid_len,
169                                           dst_req->recv.stream.offset, last);
170     if (ucs_likely(status == UCS_OK)) {
171         dst_req->recv.stream.offset += valid_len;
172         ucs_trace_data("unpacked %zd bytes of stream data %p",
173                        valid_len, rdata);
174         return valid_len;
175     }
176 
177     ucs_assert(status != UCS_ERR_MESSAGE_TRUNCATED);
178     return status;
179 }
180 
181 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_rdesc_advance(ucp_recv_desc_t * rdesc,ssize_t offset,ucp_ep_ext_proto_t * ep_ext)182 ucp_stream_rdesc_advance(ucp_recv_desc_t *rdesc, ssize_t offset,
183                          ucp_ep_ext_proto_t *ep_ext)
184 {
185     ucs_assert(offset <= rdesc->length);
186 
187     if (ucs_unlikely(offset < 0)) {
188         return (ucs_status_t)offset;
189     } else if (ucs_likely(offset == rdesc->length)) {
190         ucp_stream_rdesc_dequeue_and_release(rdesc, ep_ext);
191     } else {
192         rdesc->length         -= offset;
193         rdesc->payload_offset += offset;
194     }
195 
196     return UCS_OK;
197 }
198 
199 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_process_rdesc_inplace(ucp_recv_desc_t * rdesc,ucp_datatype_t dt,void * buffer,size_t count,size_t length,ucp_ep_ext_proto_t * ep_ext)200 ucp_stream_process_rdesc_inplace(ucp_recv_desc_t *rdesc, ucp_datatype_t dt,
201                                  void *buffer, size_t count, size_t length,
202                                  ucp_ep_ext_proto_t *ep_ext)
203 {
204     ucp_worker_h worker = ucp_ep_from_ext_proto(ep_ext)->worker;
205     ucs_status_t status;
206     ssize_t unpacked;
207     ucs_memory_type_t mem_type;
208 
209     mem_type = ucp_memory_type_detect(worker->context, buffer, length);
210     status   = ucp_dt_unpack_only(worker, buffer, count, dt, mem_type,
211                                   ucp_stream_rdesc_payload(rdesc), length, 0);
212 
213     unpacked = ucs_likely(status == UCS_OK) ? length : status;
214 
215     return ucp_stream_rdesc_advance(rdesc, unpacked, ep_ext);
216 }
217 
218 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_process_rdesc(ucp_recv_desc_t * rdesc,ucp_ep_ext_proto_t * ep_ext,ucp_request_t * req)219 ucp_stream_process_rdesc(ucp_recv_desc_t *rdesc, ucp_ep_ext_proto_t *ep_ext,
220                          ucp_request_t *req)
221 {
222     ssize_t unpacked;
223 
224     unpacked = ucp_stream_rdata_unpack(ucp_stream_rdesc_payload(rdesc),
225                                        rdesc->length, req);
226     ucs_assert(req->recv.stream.offset <= req->recv.length);
227 
228     return ucp_stream_rdesc_advance(rdesc, unpacked, ep_ext);
229 }
230 
231 static UCS_F_ALWAYS_INLINE void
ucp_stream_recv_request_init(ucp_request_t * req,ucp_ep_h ep,void * buffer,size_t count,size_t length,ucp_datatype_t datatype,const ucp_request_param_t * param)232 ucp_stream_recv_request_init(ucp_request_t *req, ucp_ep_h ep, void *buffer,
233                              size_t count, size_t length,
234                              ucp_datatype_t datatype,
235                              const ucp_request_param_t *param)
236 {
237     uint32_t flags = ucp_request_param_flags(param);
238 
239     req->flags              = UCP_REQUEST_FLAG_STREAM_RECV |
240                               ((flags & UCP_STREAM_RECV_FLAG_WAITALL) ?
241                                UCP_REQUEST_FLAG_STREAM_RECV_WAITALL : 0);
242 #if UCS_ENABLE_ASSERT
243     req->status             = UCS_OK; /* for ucp_request_recv_data_unpack() */
244 #endif
245     req->recv.stream.length = 0;
246     req->recv.stream.offset = 0;
247 
248     ucp_dt_recv_state_init(&req->recv.state, buffer, datatype, count);
249 
250     req->recv.worker   = ep->worker;
251     req->recv.buffer   = buffer;
252     req->recv.datatype = datatype;
253     req->recv.length   = ucs_likely(!UCP_DT_IS_GENERIC(datatype)) ? length :
254                          ucp_dt_length(datatype, count, NULL, &req->recv.state);
255     req->recv.mem_type = ucp_memory_type_detect(ep->worker->context,
256                                                 (void*)buffer, req->recv.length);
257 
258     if (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) {
259         req->flags         |= UCP_REQUEST_FLAG_CALLBACK;
260         req->recv.stream.cb = param->cb.recv_stream;
261         req->user_data      = (param->op_attr_mask & UCP_OP_ATTR_FIELD_USER_DATA) ?
262                               param->user_data : NULL;
263     }
264 }
265 
266 static UCS_F_ALWAYS_INLINE int
ucp_stream_recv_nb_is_inplace(ucp_ep_ext_proto_t * ep_ext,size_t dt_length)267 ucp_stream_recv_nb_is_inplace(ucp_ep_ext_proto_t *ep_ext, size_t dt_length)
268 {
269     return ucp_stream_ep_has_data(ep_ext) &&
270            (ucp_stream_rdesc_get(ep_ext)->length >= dt_length);
271 }
272 
273 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nb,
274                  (ep, buffer, count, datatype, cb, length, flags),
275                  ucp_ep_h ep, void *buffer, size_t count,
276                  ucp_datatype_t datatype, ucp_stream_recv_callback_t cb,
277                  size_t *length, unsigned flags)
278 {
279     ucp_request_param_t param = {
280         .op_attr_mask   = UCP_OP_ATTR_FIELD_DATATYPE |
281                           UCP_OP_ATTR_FIELD_CALLBACK |
282                           UCP_OP_ATTR_FIELD_FLAGS,
283         .cb.recv_stream = (ucp_stream_recv_nbx_callback_t)cb,
284         .flags          = flags,
285         .datatype       = datatype
286     };
287 
288     return ucp_stream_recv_nbx(ep, buffer, count, length, &param);
289 }
290 
291 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_stream_recv_nbx,
292                  (ep, buffer, count, length, param),
293                  ucp_ep_h ep, void *buffer, size_t count, size_t *length,
294                  const ucp_request_param_t *param)
295 {
296     ucs_status_t        status  = UCS_OK;
297     ucp_ep_ext_proto_t  *ep_ext = ucp_ep_ext_proto(ep);
298     ucp_datatype_t      datatype;
299     size_t              dt_length;
300     ucp_request_t       *req;
301     ucp_recv_desc_t     *rdesc;
302     uint32_t            attr_mask;
303 
304     UCP_CONTEXT_CHECK_FEATURE_FLAGS(ep->worker->context, UCP_FEATURE_STREAM,
305                                     return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
306     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
307 
308     attr_mask = param->op_attr_mask &
309                 (UCP_OP_ATTR_FIELD_DATATYPE | UCP_OP_ATTR_FLAG_NO_IMM_CMPL);
310     if (ucs_likely(attr_mask == 0)) {
311         datatype  = ucp_dt_make_contig(1);
312         dt_length = count; /* use dt_lendth to suppress coverity false positive */
313         if (ucs_likely(ucp_stream_recv_nb_is_inplace(ep_ext, count))) {
314             status  = ucp_stream_process_rdesc_inplace(ucp_stream_rdesc_get(ep_ext),
315                                                        datatype, buffer, count,
316                                                        dt_length, ep_ext);
317             *length = count;
318             goto out_status;
319         }
320     } else if (attr_mask == UCP_OP_ATTR_FIELD_DATATYPE) {
321         datatype  = param->datatype;
322         if (!UCP_DT_IS_GENERIC(datatype)) {
323             dt_length = ucp_dt_length(datatype, count, buffer, NULL);
324             if (ucp_stream_recv_nb_is_inplace(ep_ext, dt_length)) {
325                 status  = ucp_stream_process_rdesc_inplace(ucp_stream_rdesc_get(ep_ext),
326                                                            datatype, buffer, count,
327                                                            dt_length, ep_ext);
328                 *length = dt_length;
329                 goto out_status;
330             }
331         } else {
332             dt_length = 0;
333         }
334     } else {
335         datatype  = ucp_dt_make_contig(1);
336         dt_length = count;
337     }
338 
339     if (ucs_unlikely(param->op_attr_mask & UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL)) {
340         status = UCS_ERR_NO_RESOURCE;
341         goto out_status;
342     }
343 
344     req = ucp_request_get_param(ep->worker, param,
345                                 {
346                                     status = UCS_ERR_NO_MEMORY;
347                                     goto out_status;
348                                 });
349 
350     ucp_stream_recv_request_init(req, ep, buffer, count, dt_length, datatype,
351                                  param);
352 
353     /* OK, lets obtain all arrived data which matches the recv size */
354     while ((req->recv.stream.offset < req->recv.length) &&
355            ucp_stream_ep_has_data(ep_ext)) {
356 
357         rdesc  = ucp_stream_rdesc_get(ep_ext);
358         status = ucp_stream_process_rdesc(rdesc, ep_ext, req);
359         if (ucs_unlikely(status != UCS_OK)) {
360             goto out_put_request;
361         }
362 
363         /*
364          * NOTE: generic datatype can be completed with any amount of data to
365          *       avoid extra logic in ucp_stream_process_rdesc, exception is
366          *       WAITALL flag
367          */
368         if (ucs_unlikely(UCP_DT_IS_GENERIC(req->recv.datatype)) &&
369             !(req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL)) {
370             break;
371         }
372     }
373 
374     ucs_assert(req->recv.stream.offset <= req->recv.length);
375 
376     if (ucp_request_can_complete_stream_recv(req)) {
377         *length = req->recv.stream.offset;
378     } else {
379         ucs_assert(!ucp_stream_ep_has_data(ep_ext));
380         ucs_queue_push(&ep_ext->stream.match_q, &req->recv.queue);
381         req += 1;
382         goto out;
383     }
384 
385 out_put_request:
386     ucp_request_put_param(param, req);
387 
388 out_status:
389     req = UCS_STATUS_PTR(status);
390 
391 out:
392     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
393     return req;
394 }
395 
396 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_am_data_process(ucp_worker_t * worker,ucp_ep_ext_proto_t * ep_ext,ucp_stream_am_data_t * am_data,size_t length,unsigned am_flags)397 ucp_stream_am_data_process(ucp_worker_t *worker, ucp_ep_ext_proto_t *ep_ext,
398                            ucp_stream_am_data_t *am_data, size_t length,
399                            unsigned am_flags)
400 {
401     ucp_recv_desc_t  rdesc_tmp;
402     void            *payload;
403     ucp_recv_desc_t *rdesc;
404     ucp_request_t   *req;
405     ssize_t          unpacked;
406 
407     rdesc_tmp.length         = length;
408     rdesc_tmp.payload_offset = sizeof(*am_data); /* add sizeof(*rdesc) only if
409                                                     am_data wont be handled in
410                                                     place */
411 
412     /* First, process expected requests */
413     if (!ucp_stream_ep_has_data(ep_ext)) {
414         while (!ucs_queue_is_empty(&ep_ext->stream.match_q)) {
415             req      = ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
416                                                      ucp_request_t, recv.queue);
417             payload  = UCS_PTR_BYTE_OFFSET(am_data, rdesc_tmp.payload_offset);
418             unpacked = ucp_stream_rdata_unpack(payload, rdesc_tmp.length, req);
419             if (ucs_unlikely(unpacked < 0)) {
420                 ucs_fatal("failed to unpack from am_data %p with offset %u to request %p",
421                           am_data, rdesc_tmp.payload_offset, req);
422             } else if (unpacked == rdesc_tmp.length) {
423                 if (ucp_request_can_complete_stream_recv(req)) {
424                     ucp_request_complete_stream_recv(req, ep_ext, UCS_OK);
425                 }
426                 return UCS_OK;
427             }
428             ucp_stream_rdesc_advance(&rdesc_tmp, unpacked, ep_ext);
429             /* This request is full, try next one */
430             ucs_assert(ucp_request_can_complete_stream_recv(req));
431             ucp_request_complete_stream_recv(req, ep_ext, UCS_OK);
432         }
433     }
434 
435     ucs_assert(rdesc_tmp.length > 0);
436 
437     /* Now, enqueue the rest of data */
438     if (ucs_likely(!(am_flags & UCT_CB_PARAM_FLAG_DESC))) {
439         rdesc = (ucp_recv_desc_t*)ucs_mpool_get_inline(&worker->am_mp);
440         ucs_assertv_always(rdesc != NULL,
441                            "ucp recv descriptor is not allocated");
442         rdesc->length         = rdesc_tmp.length;
443         /* reset offset to improve locality */
444         rdesc->payload_offset = sizeof(*rdesc) + sizeof(*am_data);
445         rdesc->flags          = 0;
446         memcpy(ucp_stream_rdesc_payload(rdesc),
447                UCS_PTR_BYTE_OFFSET(am_data, rdesc_tmp.payload_offset),
448                rdesc_tmp.length);
449     } else {
450         /* slowpath */
451         rdesc                  = (ucp_recv_desc_t *)am_data - 1;
452         rdesc->length          = rdesc_tmp.length;
453         rdesc->payload_offset  = rdesc_tmp.payload_offset + sizeof(*rdesc);
454         rdesc->uct_desc_offset = UCP_WORKER_HEADROOM_PRIV_SIZE;
455         rdesc->flags           = UCP_RECV_DESC_FLAG_UCT_DESC;
456     }
457 
458     ucp_ep_from_ext_proto(ep_ext)->flags |= UCP_EP_FLAG_STREAM_HAS_DATA;
459     ucs_queue_push(&ep_ext->stream.match_q, &rdesc->stream_queue);
460 
461     return UCS_INPROGRESS;
462 }
463 
ucp_stream_ep_init(ucp_ep_h ep)464 void ucp_stream_ep_init(ucp_ep_h ep)
465 {
466     ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep);
467 
468     if (ep->worker->context->config.features & UCP_FEATURE_STREAM) {
469         ep_ext->stream.ready_list.prev = NULL;
470         ep_ext->stream.ready_list.next = NULL;
471         ucs_queue_head_init(&ep_ext->stream.match_q);
472     }
473 }
474 
ucp_stream_ep_cleanup(ucp_ep_h ep)475 void ucp_stream_ep_cleanup(ucp_ep_h ep)
476 {
477     ucp_ep_ext_proto_t* ep_ext;
478     ucp_request_t *req;
479     size_t length;
480     void *data;
481 
482     if (!(ep->worker->context->config.features & UCP_FEATURE_STREAM)) {
483         return;
484     }
485 
486     /* drop unmatched data */
487     while ((data = ucp_stream_recv_data_nb_nolock(ep, &length)) != NULL) {
488         ucs_assert_always(!UCS_PTR_IS_ERR(data));
489         ucp_stream_data_release(ep, data);
490     }
491 
492     ep_ext = ucp_ep_ext_proto(ep);
493 
494     if (ucp_stream_ep_is_queued(ep_ext)) {
495         ucp_stream_ep_dequeue(ep_ext);
496     }
497 
498     /* cancel not completed requests */
499     ucs_assert(!ucp_stream_ep_has_data(ep_ext));
500     while (!ucs_queue_is_empty(&ep_ext->stream.match_q)) {
501         req = ucs_queue_head_elem_non_empty(&ep_ext->stream.match_q,
502                                             ucp_request_t, recv.queue);
503         ucp_request_complete_stream_recv(req, ep_ext, UCS_ERR_CANCELED);
504     }
505 }
506 
ucp_stream_ep_activate(ucp_ep_h ep)507 void ucp_stream_ep_activate(ucp_ep_h ep)
508 {
509     ucp_ep_ext_proto_t *ep_ext = ucp_ep_ext_proto(ep);
510 
511     if ((ep->worker->context->config.features & UCP_FEATURE_STREAM) &&
512         ucp_stream_ep_has_data(ep_ext) && !ucp_stream_ep_is_queued(ep_ext)) {
513         ucp_stream_ep_enqueue(ep_ext, ep->worker);
514     }
515 }
516 
517 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_stream_am_handler(void * am_arg,void * am_data,size_t am_length,unsigned am_flags)518 ucp_stream_am_handler(void *am_arg, void *am_data, size_t am_length,
519                       unsigned am_flags)
520 {
521     ucp_worker_h          worker    = am_arg;
522     ucp_stream_am_data_t *data      = am_data;
523     ucp_ep_h              ep;
524     ucp_ep_ext_proto_t    *ep_ext;
525     ucs_status_t          status;
526 
527     ucs_assert(am_length >= sizeof(ucp_stream_am_hdr_t));
528 
529     ep     = ucp_worker_get_ep_by_ptr(worker, data->hdr.ep_ptr);
530     ep_ext = ucp_ep_ext_proto(ep);
531 
532     if (ucs_unlikely(ep->flags & UCP_EP_FLAG_CLOSED)) {
533         ucs_trace_data("ep %p: stream is invalid", ep);
534         /* drop the data */
535         return UCS_OK;
536     }
537 
538     status = ucp_stream_am_data_process(worker, ep_ext, data,
539                                         am_length - sizeof(data->hdr),
540                                         am_flags);
541     if (status == UCS_OK) {
542         /* rdesc was processed in place */
543         return UCS_OK;
544     }
545 
546     ucs_assert(status == UCS_INPROGRESS);
547 
548     if (!ucp_stream_ep_is_queued(ep_ext) && (ep->flags & UCP_EP_FLAG_USED)) {
549         ucp_stream_ep_enqueue(ep_ext, worker);
550     }
551 
552     return (am_flags & UCT_CB_PARAM_FLAG_DESC) ? UCS_INPROGRESS : UCS_OK;
553 }
554 
ucp_stream_am_dump(ucp_worker_h worker,uct_am_trace_type_t type,uint8_t id,const void * data,size_t length,char * buffer,size_t max)555 static void ucp_stream_am_dump(ucp_worker_h worker, uct_am_trace_type_t type,
556                                uint8_t id, const void *data, size_t length,
557                                char *buffer, size_t max)
558 {
559     const ucp_stream_am_hdr_t *hdr    = data;
560     size_t                    hdr_len = sizeof(*hdr);
561     char                      *p;
562 
563     snprintf(buffer, max, "STREAM ep_ptr 0x%lx", hdr->ep_ptr);
564     p = buffer + strlen(buffer);
565 
566     ucs_assert(hdr->ep_ptr != 0);
567     ucp_dump_payload(worker->context, p, buffer + max - p,
568                      UCS_PTR_BYTE_OFFSET(data, hdr_len), length - hdr_len);
569 }
570 
571 UCP_DEFINE_AM(UCP_FEATURE_STREAM, UCP_AM_ID_STREAM_DATA, ucp_stream_am_handler,
572               ucp_stream_am_dump, 0);
573 
574 UCP_DEFINE_AM_PROXY(UCP_AM_ID_STREAM_DATA);
575