1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2018. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #ifdef HAVE_CONFIG_H
8 # include "config.h"
9 #endif
10
11 #include "rma.h"
12 #include "rma.inl"
13
14 #include <ucp/core/ucp_mm.h>
15
16 #include <ucp/dt/dt_contig.h>
17 #include <ucs/profile/profile.h>
18 #include <ucs/sys/stubs.h>
19
20
21 #define UCP_RMA_CHECK_BUFFER(_buffer, _action) \
22 do { \
23 if (ENABLE_PARAMS_CHECK && ucs_unlikely((_buffer) == NULL)) { \
24 _action; \
25 } \
26 } while (0)
27
28
29 #define UCP_RMA_CHECK_ZERO_LENGTH(_length, _action) \
30 do { \
31 if ((_length) == 0) { \
32 _action; \
33 } \
34 } while (0)
35
36
37 #define UCP_RMA_CHECK(_context, _buffer, _length) \
38 do { \
39 UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, UCP_FEATURE_RMA, \
40 return UCS_ERR_INVALID_PARAM); \
41 UCP_RMA_CHECK_ZERO_LENGTH(_length, return UCS_OK); \
42 UCP_RMA_CHECK_BUFFER(_buffer, return UCS_ERR_INVALID_PARAM); \
43 } while (0)
44
45
46 #define UCP_RMA_CHECK_PTR(_context, _buffer, _length) \
47 do { \
48 UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, UCP_FEATURE_RMA, \
49 return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); \
50 UCP_RMA_CHECK_ZERO_LENGTH(_length, return NULL); \
51 UCP_RMA_CHECK_BUFFER(_buffer, \
52 return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); \
53 } while (0)
54
55
56 #define UCP_RMA_CHECK_CONTIG1(_param) \
57 if (ucs_unlikely(ENABLE_PARAMS_CHECK && \
58 ((_param)->op_attr_mask & UCP_OP_ATTR_FIELD_DATATYPE) && \
59 ((_param)->datatype != ucp_dt_make_contig(1)))) { \
60 return UCS_STATUS_PTR(UCS_ERR_UNSUPPORTED); \
61 }
62
63
64 /* request can be released if
65 * - all fragments were sent (length == 0) (bcopy & zcopy mix)
66 * - all zcopy fragments are done (uct_comp.count == 0)
67 * - and request was allocated from the mpool
68 * (checked in ucp_request_complete_send)
69 *
70 * Request can be released either immediately or in the completion callback.
71 * We must check req length in the completion callback to avoid the following
72 * scenario:
73 * partial_send;no_resos;progress;
74 * send_completed;cb called;req free(ooops);
75 * next_partial_send; (oops req already freed)
76 */
ucp_rma_request_advance(ucp_request_t * req,ssize_t frag_length,ucs_status_t status)77 ucs_status_t ucp_rma_request_advance(ucp_request_t *req, ssize_t frag_length,
78 ucs_status_t status)
79 {
80 ucs_assert(status != UCS_ERR_NOT_IMPLEMENTED);
81
82 if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
83 if (status != UCS_ERR_NO_RESOURCE) {
84 ucp_request_send_buffer_dereg(req);
85 ucp_request_complete_send(req, status);
86 }
87 return status;
88 }
89
90 ucs_assert(frag_length >= 0);
91 ucs_assert(req->send.length >= frag_length);
92 req->send.length -= frag_length;
93 if (req->send.length == 0) {
94 /* bcopy is the fast path */
95 if (ucs_likely(req->send.state.uct_comp.count == 0)) {
96 ucp_request_send_buffer_dereg(req);
97 ucp_request_complete_send(req, UCS_OK);
98 }
99 return UCS_OK;
100 }
101 req->send.buffer = UCS_PTR_BYTE_OFFSET(req->send.buffer, frag_length);
102 req->send.rma.remote_addr += frag_length;
103 return UCS_INPROGRESS;
104 }
105
ucp_rma_request_bcopy_completion(uct_completion_t * self,ucs_status_t status)106 static void ucp_rma_request_bcopy_completion(uct_completion_t *self,
107 ucs_status_t status)
108 {
109 ucp_request_t *req = ucs_container_of(self, ucp_request_t,
110 send.state.uct_comp);
111
112 if (ucs_likely(req->send.length == req->send.state.dt.offset)) {
113 ucp_request_complete_send(req, status);
114 }
115 }
116
ucp_rma_request_zcopy_completion(uct_completion_t * self,ucs_status_t status)117 static void ucp_rma_request_zcopy_completion(uct_completion_t *self,
118 ucs_status_t status)
119 {
120 ucp_request_t *req = ucs_container_of(self, ucp_request_t,
121 send.state.uct_comp);
122
123 if (ucs_likely(req->send.length == req->send.state.dt.offset)) {
124 ucp_request_send_buffer_dereg(req);
125 ucp_request_complete_send(req, status);
126 }
127 }
128
129 static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_rma_request_init(ucp_request_t * req,ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,uct_pending_callback_t cb,size_t zcopy_thresh,int flags)130 ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer,
131 size_t length, uint64_t remote_addr, ucp_rkey_h rkey,
132 uct_pending_callback_t cb, size_t zcopy_thresh, int flags)
133 {
134 req->flags = flags; /* Implicit release */
135 req->send.ep = ep;
136 req->send.buffer = (void*)buffer;
137 req->send.datatype = ucp_dt_make_contig(1);
138 req->send.mem_type = UCS_MEMORY_TYPE_HOST;
139 req->send.length = length;
140 req->send.rma.remote_addr = remote_addr;
141 req->send.rma.rkey = rkey;
142 req->send.uct.func = cb;
143 req->send.lane = rkey->cache.rma_lane;
144 ucp_request_send_state_init(req, ucp_dt_make_contig(1), length);
145 ucp_request_send_state_reset(req,
146 (length < zcopy_thresh) ?
147 ucp_rma_request_bcopy_completion :
148 ucp_rma_request_zcopy_completion,
149 UCP_REQUEST_SEND_PROTO_RMA);
150 #if UCS_ENABLE_ASSERT
151 req->send.cb = NULL;
152 #endif
153 if (length < zcopy_thresh) {
154 return UCS_OK;
155 }
156
157 return ucp_request_send_buffer_reg_lane(req, req->send.lane, 0);
158 }
159
160 static UCS_F_ALWAYS_INLINE ucs_status_ptr_t
ucp_rma_nonblocking(ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,uct_pending_callback_t progress_cb,size_t zcopy_thresh,const ucp_request_param_t * param)161 ucp_rma_nonblocking(ucp_ep_h ep, const void *buffer, size_t length,
162 uint64_t remote_addr, ucp_rkey_h rkey,
163 uct_pending_callback_t progress_cb, size_t zcopy_thresh,
164 const ucp_request_param_t *param)
165 {
166 ucs_status_t status;
167 ucp_request_t *req;
168
169 req = ucp_request_get_param(ep->worker, param,
170 {return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);});
171
172 status = ucp_rma_request_init(req, ep, buffer, length, remote_addr, rkey,
173 progress_cb, zcopy_thresh, 0);
174 if (ucs_unlikely(status != UCS_OK)) {
175 return UCS_STATUS_PTR(status);
176 }
177
178 return ucp_rma_send_request(req, param);
179 }
180
ucp_put_nbi(ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey)181 ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
182 uint64_t remote_addr, ucp_rkey_h rkey)
183 {
184 ucs_status_ptr_t status_ptr;
185
186 status_ptr = ucp_put_nbx(ep, buffer, length, remote_addr, rkey,
187 &ucp_request_null_param);
188 if (UCS_PTR_IS_PTR(status_ptr)) {
189 ucp_request_free(status_ptr);
190 return UCS_INPROGRESS;
191 }
192
193 /* coverity[overflow] */
194 return UCS_PTR_STATUS(status_ptr);
195 }
196
ucp_put_nb(ucp_ep_h ep,const void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,ucp_send_callback_t cb)197 ucs_status_ptr_t ucp_put_nb(ucp_ep_h ep, const void *buffer, size_t length,
198 uint64_t remote_addr, ucp_rkey_h rkey,
199 ucp_send_callback_t cb)
200 {
201 ucp_request_param_t param = {
202 .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK,
203 .cb.send = (ucp_send_nbx_callback_t)cb
204 };
205
206 return ucp_put_nbx(ep, buffer, length, remote_addr, rkey, ¶m);
207 }
208
ucp_put_nbx(ucp_ep_h ep,const void * buffer,size_t count,uint64_t remote_addr,ucp_rkey_h rkey,const ucp_request_param_t * param)209 ucs_status_ptr_t ucp_put_nbx(ucp_ep_h ep, const void *buffer, size_t count,
210 uint64_t remote_addr, ucp_rkey_h rkey,
211 const ucp_request_param_t *param)
212 {
213 ucp_ep_rma_config_t *rma_config;
214 ucs_status_ptr_t ptr_status;
215 ucs_status_t status;
216
217 UCP_RMA_CHECK_CONTIG1(param);
218 UCP_RMA_CHECK_PTR(ep->worker->context, buffer, count);
219 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
220
221 ucs_trace_req("put_nbx buffer %p count %zu remote_addr %"PRIx64" rkey %p to %s cb %p",
222 buffer, count, remote_addr, rkey, ucp_ep_peer_name(ep),
223 (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) ?
224 param->cb.send : NULL);
225
226 status = UCP_RKEY_RESOLVE(rkey, ep, rma);
227 if (status != UCS_OK) {
228 ptr_status = UCS_STATUS_PTR(status);
229 goto out_unlock;
230 }
231
232 /* Fast path for a single short message */
233 if (ucs_likely(!(param->op_attr_mask & UCP_OP_ATTR_FLAG_NO_IMM_CMPL) &&
234 ((ssize_t)count <= rkey->cache.max_put_short))) {
235 status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
236 buffer, count, remote_addr, rkey->cache.rma_rkey);
237 if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
238 ptr_status = UCS_STATUS_PTR(status);
239 goto out_unlock;
240 }
241 }
242
243 if (ucs_unlikely(param->op_attr_mask & UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL)) {
244 ptr_status = UCS_STATUS_PTR(UCS_ERR_NO_RESOURCE);
245 goto out_unlock;
246 }
247
248 rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
249 ptr_status = ucp_rma_nonblocking(ep, buffer, count, remote_addr, rkey,
250 rkey->cache.rma_proto->progress_put,
251 rma_config->put_zcopy_thresh, param);
252 out_unlock:
253 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
254 return ptr_status;
255 }
256
ucp_get_nbi(ucp_ep_h ep,void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey)257 ucs_status_t ucp_get_nbi(ucp_ep_h ep, void *buffer, size_t length,
258 uint64_t remote_addr, ucp_rkey_h rkey)
259 {
260 ucs_status_ptr_t status_ptr;
261
262 status_ptr = ucp_get_nbx(ep, buffer, length, remote_addr, rkey,
263 &ucp_request_null_param);
264 if (UCS_PTR_IS_PTR(status_ptr)) {
265 ucp_request_free(status_ptr);
266 return UCS_INPROGRESS;
267 }
268
269 /* coverity[overflow] */
270 return UCS_PTR_STATUS(status_ptr);
271 }
272
ucp_get_nb(ucp_ep_h ep,void * buffer,size_t length,uint64_t remote_addr,ucp_rkey_h rkey,ucp_send_callback_t cb)273 ucs_status_ptr_t ucp_get_nb(ucp_ep_h ep, void *buffer, size_t length,
274 uint64_t remote_addr, ucp_rkey_h rkey,
275 ucp_send_callback_t cb)
276 {
277 ucp_request_param_t param = {
278 .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK,
279 .cb.send = (ucp_send_nbx_callback_t)cb
280 };
281
282 return ucp_get_nbx(ep, buffer, length, remote_addr, rkey, ¶m);
283 }
284
ucp_get_nbx(ucp_ep_h ep,void * buffer,size_t count,uint64_t remote_addr,ucp_rkey_h rkey,const ucp_request_param_t * param)285 ucs_status_ptr_t ucp_get_nbx(ucp_ep_h ep, void *buffer, size_t count,
286 uint64_t remote_addr, ucp_rkey_h rkey,
287 const ucp_request_param_t *param)
288 {
289 ucp_ep_rma_config_t *rma_config;
290 ucs_status_ptr_t ptr_status;
291 ucs_status_t status;
292
293 UCP_RMA_CHECK_CONTIG1(param);
294
295 if (ucs_unlikely(param->op_attr_mask & UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL)) {
296 return UCS_STATUS_PTR(UCS_ERR_NO_RESOURCE);
297 }
298
299 UCP_RMA_CHECK_PTR(ep->worker->context, buffer, count);
300 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
301
302 ucs_trace_req("get_nbx buffer %p count %zu remote_addr %"PRIx64" rkey %p from %s cb %p",
303 buffer, count, remote_addr, rkey, ucp_ep_peer_name(ep),
304 (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) ?
305 param->cb.send : NULL);
306
307 status = UCP_RKEY_RESOLVE(rkey, ep, rma);
308 if (status != UCS_OK) {
309 ptr_status = UCS_STATUS_PTR(status);
310 goto out_unlock;
311 }
312
313 rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
314 ptr_status = ucp_rma_nonblocking(ep, buffer, count, remote_addr, rkey,
315 rkey->cache.rma_proto->progress_get,
316 rma_config->get_zcopy_thresh, param);
317 out_unlock:
318 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
319 return ptr_status;
320 }
321
322 UCS_PROFILE_FUNC(ucs_status_t, ucp_put, (ep, buffer, length, remote_addr, rkey),
323 ucp_ep_h ep, const void *buffer, size_t length,
324 uint64_t remote_addr, ucp_rkey_h rkey)
325 {
326 return ucp_rma_wait(ep->worker,
327 ucp_put_nb(ep, buffer, length, remote_addr, rkey,
328 (ucp_send_callback_t)ucs_empty_function),
329 "put");
330 }
331
332 UCS_PROFILE_FUNC(ucs_status_t, ucp_get, (ep, buffer, length, remote_addr, rkey),
333 ucp_ep_h ep, void *buffer, size_t length,
334 uint64_t remote_addr, ucp_rkey_h rkey)
335 {
336 return ucp_rma_wait(ep->worker,
337 ucp_get_nb(ep, buffer, length, remote_addr, rkey,
338 (ucp_send_callback_t)ucs_empty_function),
339 "get");
340 }
341