1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2018.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "rma.h"
12 #include "rma.inl"
13 
14 #include <ucp/core/ucp_mm.h>
15 
16 #include <ucp/dt/dt_contig.h>
17 #include <ucs/profile/profile.h>
18 #include <ucs/sys/stubs.h>
19 
20 
21 #define UCP_RMA_CHECK_BUFFER(_buffer, _action) \
22     do { \
23         if (ENABLE_PARAMS_CHECK && ucs_unlikely((_buffer) == NULL)) { \
24             _action; \
25         } \
26     } while (0)
27 
28 
29 #define UCP_RMA_CHECK_ZERO_LENGTH(_length, _action) \
30     do { \
31         if ((_length) == 0) { \
32             _action; \
33         } \
34     } while (0)
35 
36 
37 #define UCP_RMA_CHECK(_context, _buffer, _length) \
38     do { \
39         UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, UCP_FEATURE_RMA, \
40                                         return UCS_ERR_INVALID_PARAM); \
41         UCP_RMA_CHECK_ZERO_LENGTH(_length, return UCS_OK); \
42         UCP_RMA_CHECK_BUFFER(_buffer, return UCS_ERR_INVALID_PARAM); \
43     } while (0)
44 
45 
46 #define UCP_RMA_CHECK_PTR(_context, _buffer, _length) \
47     do { \
48         UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, UCP_FEATURE_RMA, \
49                                         return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); \
50         UCP_RMA_CHECK_ZERO_LENGTH(_length, return NULL); \
51         UCP_RMA_CHECK_BUFFER(_buffer, \
52                              return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); \
53     } while (0)
54 
55 
56 #define UCP_RMA_CHECK_CONTIG1(_param) \
57     if (ucs_unlikely(ENABLE_PARAMS_CHECK && \
58                      ((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_DATATYPE) && \
59                      ((_param)->datatype != ucp_dt_make_contig(1)))) { \
60         return UCS_STATUS_PTR(UCS_ERR_UNSUPPORTED); \
61     }
62 
63 
64 /* request can be released if
65  *  - all fragments were sent (length == 0) (bcopy & zcopy mix)
66  *  - all zcopy fragments are done (uct_comp.count == 0)
67  *  - and request was allocated from the mpool
68  *    (checked in ucp_request_complete_send)
69  *
70  * Request can be released either immediately or in the completion callback.
71  * We must check req length in the completion callback to avoid the following
72  * scenario:
73  *  partial_send;no_resos;progress;
74  *  send_completed;cb called;req free(ooops);
75  *  next_partial_send; (oops req already freed)
76  */
ucp_rma_request_advance(ucp_request_t * req,ssize_t frag_length,ucs_status_t status)77 ucs_status_t ucp_rma_request_advance(ucp_request_t *req, ssize_t frag_length,
78                                      ucs_status_t status)
79 {
80     ucs_assert(status != UCS_ERR_NOT_IMPLEMENTED);
81 
82     if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
83         if (status != UCS_ERR_NO_RESOURCE) {
84             ucp_request_send_buffer_dereg(req);
85             ucp_request_complete_send(req, status);
86         }
87         return status;
88     }
89 
90     ucs_assert(frag_length >= 0);
91     ucs_assert(req->send.length >= frag_length);
92     req->send.length -= frag_length;
93     if (req->send.length == 0) {
94         /* bcopy is the fast path */
95         if (ucs_likely(req->send.state.uct_comp.count == 0)) {
96             ucp_request_send_buffer_dereg(req);
97             ucp_request_complete_send(req, UCS_OK);
98         }
99         return UCS_OK;
100     }
101     req->send.buffer           = UCS_PTR_BYTE_OFFSET(req->send.buffer, frag_length);
102     req->send.rma.remote_addr += frag_length;
103     return UCS_INPROGRESS;
104 }
105 
ucp_rma_request_bcopy_completion(uct_completion_t * self,ucs_status_t status)106 static void ucp_rma_request_bcopy_completion(uct_completion_t *self,
107                                              ucs_status_t status)
108 {
109     ucp_request_t *req = ucs_container_of(self, ucp_request_t,
110                                           send.state.uct_comp);
111 
112     if (ucs_likely(req->send.length == req->send.state.dt.offset)) {
113         ucp_request_complete_send(req, status);
114     }
115 }
116 
ucp_rma_request_zcopy_completion(uct_completion_t * self,ucs_status_t status)117 static void ucp_rma_request_zcopy_completion(uct_completion_t *self,
118                                              ucs_status_t status)
119 {
120     ucp_request_t *req = ucs_container_of(self, ucp_request_t,
121                                           send.state.uct_comp);
122 
123     if (ucs_likely(req->send.length == req->send.state.dt.offset)) {
124         ucp_request_send_buffer_dereg(req);
125         ucp_request_complete_send(req, status);
126     }
127 }
128 
129 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_rma_request_init(ucp_request_t * req,ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,uct_pending_callback_t cb,size_t zcopy_thresh,int flags)130 ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer,
131                      size_t length, uint64_t remote_addr, ucp_rkey_h rkey,
132                      uct_pending_callback_t cb, size_t zcopy_thresh, int flags)
133 {
134     req->flags                = flags; /* Implicit release */
135     req->send.ep              = ep;
136     req->send.buffer          = (void*)buffer;
137     req->send.datatype        = ucp_dt_make_contig(1);
138     req->send.mem_type        = UCS_MEMORY_TYPE_HOST;
139     req->send.length          = length;
140     req->send.rma.remote_addr = remote_addr;
141     req->send.rma.rkey        = rkey;
142     req->send.uct.func        = cb;
143     req->send.lane            = rkey->cache.rma_lane;
144     ucp_request_send_state_init(req, ucp_dt_make_contig(1), length);
145     ucp_request_send_state_reset(req,
146                                  (length < zcopy_thresh) ?
147                                  ucp_rma_request_bcopy_completion :
148                                  ucp_rma_request_zcopy_completion,
149                                  UCP_REQUEST_SEND_PROTO_RMA);
150 #if UCS_ENABLE_ASSERT
151     req->send.cb              = NULL;
152 #endif
153     if (length < zcopy_thresh) {
154         return UCS_OK;
155     }
156 
157     return ucp_request_send_buffer_reg_lane(req, req->send.lane, 0);
158 }
159 
160 static UCS_F_ALWAYS_INLINE ucs_status_ptr_t
ucp_rma_nonblocking(ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,uct_pending_callback_t progress_cb,size_t zcopy_thresh,const ucp_request_param_t * param)161 ucp_rma_nonblocking(ucp_ep_h ep, const void *buffer, size_t length,
162                     uint64_t remote_addr, ucp_rkey_h rkey,
163                     uct_pending_callback_t progress_cb, size_t zcopy_thresh,
164                     const ucp_request_param_t *param)
165 {
166     ucs_status_t status;
167     ucp_request_t *req;
168 
169     req = ucp_request_get_param(ep->worker, param,
170                                 {return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);});
171 
172     status = ucp_rma_request_init(req, ep, buffer, length, remote_addr, rkey,
173                                   progress_cb, zcopy_thresh, 0);
174     if (ucs_unlikely(status != UCS_OK)) {
175         return UCS_STATUS_PTR(status);
176     }
177 
178     return ucp_rma_send_request(req, param);
179 }
180 
ucp_put_nbi(ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey)181 ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
182                          uint64_t remote_addr, ucp_rkey_h rkey)
183 {
184     ucs_status_ptr_t status_ptr;
185 
186     status_ptr = ucp_put_nbx(ep, buffer, length, remote_addr, rkey,
187                              &ucp_request_null_param);
188     if (UCS_PTR_IS_PTR(status_ptr)) {
189         ucp_request_free(status_ptr);
190         return UCS_INPROGRESS;
191     }
192 
193     /* coverity[overflow] */
194     return UCS_PTR_STATUS(status_ptr);
195 }
196 
ucp_put_nb(ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,ucp_send_callback_t cb)197 ucs_status_ptr_t ucp_put_nb(ucp_ep_h ep, const void *buffer, size_t length,
198                             uint64_t remote_addr, ucp_rkey_h rkey,
199                             ucp_send_callback_t cb)
200 {
201     ucp_request_param_t param = {
202         .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK,
203         .cb.send      = (ucp_send_nbx_callback_t)cb
204     };
205 
206     return ucp_put_nbx(ep, buffer, length, remote_addr, rkey, &param);
207 }
208 
ucp_put_nbx(ucp_ep_h ep,const void * buffer,size_t count,uint64_t remote_addr,ucp_rkey_h rkey,const ucp_request_param_t * param)209 ucs_status_ptr_t ucp_put_nbx(ucp_ep_h ep, const void *buffer, size_t count,
210                              uint64_t remote_addr, ucp_rkey_h rkey,
211                              const ucp_request_param_t *param)
212 {
213     ucp_ep_rma_config_t *rma_config;
214     ucs_status_ptr_t ptr_status;
215     ucs_status_t status;
216 
217     UCP_RMA_CHECK_CONTIG1(param);
218     UCP_RMA_CHECK_PTR(ep->worker->context, buffer, count);
219     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
220 
221     ucs_trace_req("put_nbx buffer %p count %zu remote_addr %"PRIx64" rkey %p to %s cb %p",
222                    buffer, count, remote_addr, rkey, ucp_ep_peer_name(ep),
223                    (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) ?
224                    param->cb.send : NULL);
225 
226     status = UCP_RKEY_RESOLVE(rkey, ep, rma);
227     if (status != UCS_OK) {
228         ptr_status = UCS_STATUS_PTR(status);
229         goto out_unlock;
230     }
231 
232     /* Fast path for a single short message */
233     if (ucs_likely(!(param->op_attr_mask & UCP_OP_ATTR_FLAG_NO_IMM_CMPL) &&
234                     ((ssize_t)count <= rkey->cache.max_put_short))) {
235         status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
236                                   buffer, count, remote_addr, rkey->cache.rma_rkey);
237         if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
238             ptr_status = UCS_STATUS_PTR(status);
239             goto out_unlock;
240         }
241     }
242 
243     if (ucs_unlikely(param->op_attr_mask & UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL)) {
244         ptr_status = UCS_STATUS_PTR(UCS_ERR_NO_RESOURCE);
245         goto out_unlock;
246     }
247 
248     rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
249     ptr_status = ucp_rma_nonblocking(ep, buffer, count, remote_addr, rkey,
250                                      rkey->cache.rma_proto->progress_put,
251                                      rma_config->put_zcopy_thresh, param);
252 out_unlock:
253     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
254     return ptr_status;
255 }
256 
ucp_get_nbi(ucp_ep_h ep,void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey)257 ucs_status_t ucp_get_nbi(ucp_ep_h ep, void *buffer, size_t length,
258                          uint64_t remote_addr, ucp_rkey_h rkey)
259 {
260     ucs_status_ptr_t status_ptr;
261 
262     status_ptr = ucp_get_nbx(ep, buffer, length, remote_addr, rkey,
263                              &ucp_request_null_param);
264     if (UCS_PTR_IS_PTR(status_ptr)) {
265         ucp_request_free(status_ptr);
266         return UCS_INPROGRESS;
267     }
268 
269     /* coverity[overflow] */
270     return UCS_PTR_STATUS(status_ptr);
271 }
272 
ucp_get_nb(ucp_ep_h ep,void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,ucp_send_callback_t cb)273 ucs_status_ptr_t ucp_get_nb(ucp_ep_h ep, void *buffer, size_t length,
274                             uint64_t remote_addr, ucp_rkey_h rkey,
275                             ucp_send_callback_t cb)
276 {
277     ucp_request_param_t param = {
278         .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK,
279         .cb.send      = (ucp_send_nbx_callback_t)cb
280     };
281 
282     return ucp_get_nbx(ep, buffer, length, remote_addr, rkey, &param);
283 }
284 
ucp_get_nbx(ucp_ep_h ep,void * buffer,size_t count,uint64_t remote_addr,ucp_rkey_h rkey,const ucp_request_param_t * param)285 ucs_status_ptr_t ucp_get_nbx(ucp_ep_h ep, void *buffer, size_t count,
286                              uint64_t remote_addr, ucp_rkey_h rkey,
287                              const ucp_request_param_t *param)
288 {
289     ucp_ep_rma_config_t *rma_config;
290     ucs_status_ptr_t ptr_status;
291     ucs_status_t status;
292 
293     UCP_RMA_CHECK_CONTIG1(param);
294 
295     if (ucs_unlikely(param->op_attr_mask & UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL)) {
296         return UCS_STATUS_PTR(UCS_ERR_NO_RESOURCE);
297     }
298 
299     UCP_RMA_CHECK_PTR(ep->worker->context, buffer, count);
300     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
301 
302     ucs_trace_req("get_nbx buffer %p count %zu remote_addr %"PRIx64" rkey %p from %s cb %p",
303                    buffer, count, remote_addr, rkey, ucp_ep_peer_name(ep),
304                    (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) ?
305                    param->cb.send : NULL);
306 
307     status = UCP_RKEY_RESOLVE(rkey, ep, rma);
308     if (status != UCS_OK) {
309         ptr_status = UCS_STATUS_PTR(status);
310         goto out_unlock;
311     }
312 
313     rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
314     ptr_status = ucp_rma_nonblocking(ep, buffer, count, remote_addr, rkey,
315                                      rkey->cache.rma_proto->progress_get,
316                                      rma_config->get_zcopy_thresh, param);
317 out_unlock:
318     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
319     return ptr_status;
320 }
321 
322 UCS_PROFILE_FUNC(ucs_status_t, ucp_put, (ep, buffer, length, remote_addr, rkey),
323                  ucp_ep_h ep, const void *buffer, size_t length,
324                  uint64_t remote_addr, ucp_rkey_h rkey)
325 {
326     return ucp_rma_wait(ep->worker,
327                         ucp_put_nb(ep, buffer, length, remote_addr, rkey,
328                                    (ucp_send_callback_t)ucs_empty_function),
329                         "put");
330 }
331 
332 UCS_PROFILE_FUNC(ucs_status_t, ucp_get, (ep, buffer, length, remote_addr, rkey),
333                  ucp_ep_h ep, void *buffer, size_t length,
334                  uint64_t remote_addr, ucp_rkey_h rkey)
335 {
336     return ucp_rma_wait(ep->worker,
337                         ucp_get_nb(ep, buffer, length, remote_addr, rkey,
338                                    (ucp_send_callback_t)ucs_empty_function),
339                         "get");
340 }
341