1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2017.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "wireup.h"
12 
13 #include <ucp/core/ucp_proxy_ep.h>
14 
15 
16 /* Context for packing short data into bcopy */
17 typedef struct {
18     uint64_t     header;
19     const void   *payload;
20     unsigned     length;
21 } ucp_signaling_ep_pack_ctx_t;
22 
23 
ucp_signaling_ep_pack_short(void * dest,void * arg)24 static size_t ucp_signaling_ep_pack_short(void *dest, void *arg)
25 {
26     ucp_signaling_ep_pack_ctx_t *ctx = arg;
27 
28     *(uint64_t*)dest = ctx->header;
29     memcpy(UCS_PTR_BYTE_OFFSET(dest, sizeof(uint64_t)), ctx->payload, ctx->length);
30     return sizeof(uint64_t) + ctx->length;
31 }
32 
ucp_signaling_ep_pack_tag_short(void * dest,void * arg)33 static size_t ucp_signaling_ep_pack_tag_short(void *dest, void *arg)
34 {
35     ucp_signaling_ep_pack_ctx_t *ctx = arg;
36 
37     memcpy(dest, ctx->payload, ctx->length);
38     return ctx->length;
39 }
40 
41 static ucs_status_t
ucp_signaling_ep_am_short(uct_ep_h ep,uint8_t id,uint64_t header,const void * payload,unsigned length)42 ucp_signaling_ep_am_short(uct_ep_h ep, uint8_t id, uint64_t header,
43                           const void *payload, unsigned length)
44 {
45     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
46     ucp_signaling_ep_pack_ctx_t ctx;
47     ssize_t packed_size;
48 
49     ctx.header  = header;
50     ctx.payload = payload;
51     ctx.length  = length;
52 
53     ucp_assert_memtype(proxy_ep->ucp_ep->worker->context, ctx.payload,
54                        ctx.length, UCS_MEMORY_TYPE_HOST);
55 
56     packed_size = uct_ep_am_bcopy(proxy_ep->uct_ep, id,
57                                   ucp_signaling_ep_pack_short, &ctx,
58                                   UCT_SEND_FLAG_SIGNALED);
59     if (packed_size < 0) {
60         return (ucs_status_t)packed_size;
61     }
62 
63     ucp_proxy_ep_replace(proxy_ep);
64     return UCS_OK;
65 }
66 
67 static ssize_t
ucp_signaling_ep_am_bcopy(uct_ep_h ep,uint8_t id,uct_pack_callback_t pack_cb,void * arg,unsigned flags)68 ucp_signaling_ep_am_bcopy(uct_ep_h ep, uint8_t id, uct_pack_callback_t pack_cb,
69                           void *arg, unsigned flags)
70 {
71     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
72     ssize_t packed_size;
73 
74     packed_size = uct_ep_am_bcopy(proxy_ep->uct_ep, id, pack_cb, arg,
75                                   flags | UCT_SEND_FLAG_SIGNALED);
76     if (packed_size >= 0) {
77         ucp_proxy_ep_replace(proxy_ep);
78     }
79     return packed_size;
80 }
81 
82 static ucs_status_t
ucp_signaling_ep_am_zcopy(uct_ep_h ep,uint8_t id,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)83 ucp_signaling_ep_am_zcopy(uct_ep_h ep, uint8_t id, const void *header,
84                           unsigned header_length, const uct_iov_t *iov,
85                           size_t iovcnt, unsigned flags, uct_completion_t *comp)
86 {
87     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
88     ucs_status_t status;
89 
90     status = uct_ep_am_zcopy(proxy_ep->uct_ep, id, header, header_length, iov,
91                              iovcnt, flags | UCT_SEND_FLAG_SIGNALED, comp);
92     if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
93         ucp_proxy_ep_replace(proxy_ep);
94     }
95     return status;
96 }
97 
98 static ucs_status_t
ucp_signaling_ep_tag_eager_short(uct_ep_h ep,uct_tag_t tag,const void * data,size_t length)99 ucp_signaling_ep_tag_eager_short(uct_ep_h ep, uct_tag_t tag, const void *data,
100                                  size_t length)
101 {
102     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
103     ucp_signaling_ep_pack_ctx_t ctx;
104     ssize_t packed_size;
105 
106     ctx.payload = data;
107     ctx.length  = length;
108 
109     ucp_assert_memtype(proxy_ep->ucp_ep->worker->context, ctx.payload,
110                        ctx.length, UCS_MEMORY_TYPE_HOST);
111 
112     packed_size = uct_ep_tag_eager_bcopy(proxy_ep->uct_ep, tag, 0,
113                                          ucp_signaling_ep_pack_tag_short, &ctx,
114                                          UCT_SEND_FLAG_SIGNALED);
115     if (packed_size < 0) {
116         return (ucs_status_t)packed_size;
117     }
118 
119     ucp_proxy_ep_replace(proxy_ep);
120     return UCS_OK;
121 }
122 
123 static ssize_t
ucp_signaling_ep_tag_eager_bcopy(uct_ep_h ep,uct_tag_t tag,uint64_t imm,uct_pack_callback_t pack_cb,void * arg,unsigned flags)124 ucp_signaling_ep_tag_eager_bcopy(uct_ep_h ep, uct_tag_t tag, uint64_t imm,
125                                  uct_pack_callback_t pack_cb, void *arg,
126                                  unsigned flags)
127 {
128     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
129     ssize_t packed_size;
130 
131     packed_size = uct_ep_tag_eager_bcopy(proxy_ep->uct_ep, tag, imm, pack_cb,
132                                          arg, flags | UCT_SEND_FLAG_SIGNALED);
133     if (packed_size >= 0) {
134         ucp_proxy_ep_replace(proxy_ep);
135     }
136     return packed_size;
137 }
138 
139 static ucs_status_t
ucp_signaling_ep_tag_eager_zcopy(uct_ep_h ep,uct_tag_t tag,uint64_t imm,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)140 ucp_signaling_ep_tag_eager_zcopy(uct_ep_h ep, uct_tag_t tag, uint64_t imm,
141                                  const uct_iov_t *iov, size_t iovcnt,
142                                  unsigned flags, uct_completion_t *comp)
143 {
144     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
145     ucs_status_t status;
146 
147     status = uct_ep_tag_eager_zcopy(proxy_ep->uct_ep, tag, imm, iov, iovcnt,
148                                     flags | UCT_SEND_FLAG_SIGNALED, comp);
149 
150     if (!UCS_STATUS_IS_ERR(status)) {
151         ucp_proxy_ep_replace(proxy_ep);
152     }
153     return status;
154 }
155 
156 static ucs_status_ptr_t
ucp_signaling_ep_tag_rndv_zcopy(uct_ep_h ep,uct_tag_t tag,const void * header,unsigned header_length,const uct_iov_t * iov,size_t iovcnt,unsigned flags,uct_completion_t * comp)157 ucp_signaling_ep_tag_rndv_zcopy(uct_ep_h ep, uct_tag_t tag, const void *header,
158                                 unsigned header_length, const uct_iov_t *iov,
159                                 size_t iovcnt, unsigned flags,
160                                 uct_completion_t *comp)
161 {
162     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
163     ucs_status_ptr_t status;
164 
165     status = uct_ep_tag_rndv_zcopy(proxy_ep->uct_ep, tag, header, header_length,
166                                    iov, iovcnt, flags | UCT_SEND_FLAG_SIGNALED,
167                                    comp);
168     if (!UCS_PTR_IS_ERR(status)) {
169         ucp_proxy_ep_replace(proxy_ep);
170     }
171     return status;
172 }
173 
174 static ucs_status_t
ucp_signaling_ep_tag_rndv_request(uct_ep_h ep,uct_tag_t tag,const void * header,unsigned header_length,unsigned flags)175 ucp_signaling_ep_tag_rndv_request(uct_ep_h ep, uct_tag_t tag,
176                                   const void* header, unsigned header_length,
177                                   unsigned flags)
178 {
179     ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t);
180     ucs_status_t status;
181 
182     status = uct_ep_tag_rndv_request(proxy_ep->uct_ep, tag, header,
183                                      header_length,
184                                      flags | UCT_SEND_FLAG_SIGNALED);
185     if (status == UCS_OK) {
186         ucp_proxy_ep_replace(proxy_ep);
187     }
188     return status;
189 }
190 
ucp_signaling_ep_create(ucp_ep_h ucp_ep,uct_ep_h uct_ep,int is_owner,uct_ep_h * signaling_ep)191 ucs_status_t ucp_signaling_ep_create(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
192                                      int is_owner, uct_ep_h *signaling_ep)
193 {
194     static uct_iface_ops_t signaling_ep_ops = {
195         .ep_am_short         = ucp_signaling_ep_am_short,
196         .ep_am_bcopy         = ucp_signaling_ep_am_bcopy,
197         .ep_am_zcopy         = ucp_signaling_ep_am_zcopy,
198         .ep_tag_eager_short  = ucp_signaling_ep_tag_eager_short,
199         .ep_tag_eager_bcopy  = ucp_signaling_ep_tag_eager_bcopy,
200         .ep_tag_eager_zcopy  = ucp_signaling_ep_tag_eager_zcopy,
201         .ep_tag_rndv_zcopy   = ucp_signaling_ep_tag_rndv_zcopy,
202         .ep_tag_rndv_request = ucp_signaling_ep_tag_rndv_request
203     };
204 
205     return UCS_CLASS_NEW(ucp_proxy_ep_t, signaling_ep, &signaling_ep_ops,
206                          ucp_ep, uct_ep, is_owner);
207 }
208