1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2017.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #ifndef UCP_STREAM_H_
8 #define UCP_STREAM_H_
9 
10 #include <ucp/core/ucp_ep.h>
11 #include <ucp/core/ucp_ep.inl>
12 #include <ucp/core/ucp_worker.h>
13 
14 
15 typedef struct {
16     uintptr_t                ep_ptr;
17 } UCS_S_PACKED ucp_stream_am_hdr_t;
18 
19 
20 typedef struct {
21     union {
22         ucp_stream_am_hdr_t  hdr;
23         ucp_recv_desc_t     *rdesc;
24     };
25 } ucp_stream_am_data_t;
26 
27 
28 void ucp_stream_ep_init(ucp_ep_h ep);
29 
30 void ucp_stream_ep_cleanup(ucp_ep_h ep);
31 
32 void ucp_stream_ep_activate(ucp_ep_h ep);
33 
34 
ucp_stream_ep_is_queued(ucp_ep_ext_proto_t * ep_ext)35 static UCS_F_ALWAYS_INLINE int ucp_stream_ep_is_queued(ucp_ep_ext_proto_t *ep_ext)
36 {
37     return ep_ext->stream.ready_list.next != NULL;
38 }
39 
ucp_stream_ep_has_data(ucp_ep_ext_proto_t * ep_ext)40 static UCS_F_ALWAYS_INLINE int ucp_stream_ep_has_data(ucp_ep_ext_proto_t *ep_ext)
41 {
42     return ucp_ep_from_ext_proto(ep_ext)->flags & UCP_EP_FLAG_STREAM_HAS_DATA;
43 }
44 
45 static UCS_F_ALWAYS_INLINE
ucp_stream_ep_enqueue(ucp_ep_ext_proto_t * ep_ext,ucp_worker_h worker)46 void ucp_stream_ep_enqueue(ucp_ep_ext_proto_t *ep_ext, ucp_worker_h worker)
47 {
48     ucs_assert(!ucp_stream_ep_is_queued(ep_ext));
49     ucs_list_add_tail(&worker->stream_ready_eps, &ep_ext->stream.ready_list);
50 }
51 
ucp_stream_ep_dequeue(ucp_ep_ext_proto_t * ep_ext)52 static UCS_F_ALWAYS_INLINE void ucp_stream_ep_dequeue(ucp_ep_ext_proto_t *ep_ext)
53 {
54     ucs_list_del(&ep_ext->stream.ready_list);
55     ep_ext->stream.ready_list.next = NULL;
56 }
57 
58 static UCS_F_ALWAYS_INLINE ucp_ep_ext_proto_t*
ucp_stream_worker_dequeue_ep_head(ucp_worker_h worker)59 ucp_stream_worker_dequeue_ep_head(ucp_worker_h worker)
60 {
61     ucp_ep_ext_proto_t *ep_ext = ucs_list_head(&worker->stream_ready_eps,
62                                                ucp_ep_ext_proto_t,
63                                                stream.ready_list);
64 
65     ucs_assert(ep_ext->stream.ready_list.next != NULL);
66     ucp_stream_ep_dequeue(ep_ext);
67     return ep_ext;
68 }
69 
70 #endif /* UCP_STREAM_H_ */
71