1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7
8 #include "test_ucp_tag.h"
9
10 #include "ucp_datatype.h"
11
12 extern "C" {
13 #include <ucp/core/ucp_worker.h>
14 #include <ucp/core/ucp_ep.h>
15 #include <ucp/core/ucp_ep.inl>
16 }
17
18
get_ctx_params()19 ucp_params_t test_ucp_tag::get_ctx_params() {
20 ucp_params_t params = ucp_test::get_ctx_params();
21 params.field_mask |= UCP_PARAM_FIELD_FEATURES |
22 UCP_PARAM_FIELD_REQUEST_INIT |
23 UCP_PARAM_FIELD_REQUEST_SIZE;
24 params.features = UCP_FEATURE_TAG;
25 params.request_size = sizeof(request);
26 params.request_init = request_init;
27 return params;
28 }
29
init()30 void test_ucp_tag::init()
31 {
32 ucp_test::init();
33 sender().connect(&receiver(), get_ep_params());
34
35 ctx_attr.field_mask = 0;
36 ctx_attr.field_mask |= UCP_ATTR_FIELD_REQUEST_SIZE;
37 ctx_attr.field_mask |= UCP_ATTR_FIELD_THREAD_MODE;
38 ucp_context_query(receiver().ucph(), &ctx_attr);
39
40 ucp::dt_gen_start_count = 0;
41 ucp::dt_gen_finish_count = 0;
42 }
43
enable_tag_mp_offload()44 void test_ucp_tag::enable_tag_mp_offload()
45 {
46 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
47 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_MP_SRQ_ENABLE", "try"));
48 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_MP_NUM_STRIDES", "8"));
49 m_env.push_back(new ucs::scoped_setenv("UCX_IB_MLX5_DEVX_OBJECTS",
50 "dct,dcsrq,rcsrq,rcqp"));
51 }
52
request_init(void * request)53 void test_ucp_tag::request_init(void *request)
54 {
55 struct request *req = (struct request *)request;
56 req->completed = false;
57 req->external = false;
58 req->info.length = 0;
59 req->info.sender_tag = 0;
60 }
61
request_release(struct request * req)62 void test_ucp_tag::request_release(struct request *req)
63 {
64 if (req->external) {
65 free(req->req_mem);
66 } else {
67 req->completed = false;
68 ucp_request_release(req);
69 }
70 }
71
request_free(struct request * req)72 void test_ucp_tag::request_free(struct request *req)
73 {
74 if (req->external) {
75 free(req->req_mem);
76 } else {
77 req->completed = false;
78 ucp_request_free(req);
79 }
80 }
81
request_alloc()82 test_ucp_tag::request* test_ucp_tag::request_alloc()
83 {
84 void *mem = malloc(ctx_attr.request_size + sizeof(request));
85 request *req = (request*)((char*)mem + ctx_attr.request_size);
86 request_init(req);
87 req->external = true;
88 req->req_mem = mem;
89 return req;
90 }
91
send_callback(void * request,ucs_status_t status)92 void test_ucp_tag::send_callback(void *request, ucs_status_t status)
93 {
94 struct request *req = (struct request *)request;
95 ucs_assert(req->completed == false);
96 req->status = status;
97 req->completed = true;
98 }
99
recv_callback(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)100 void test_ucp_tag::recv_callback(void *request, ucs_status_t status,
101 ucp_tag_recv_info_t *info)
102 {
103 struct request *req = (struct request *)request;
104 ucs_assert(req->completed == false);
105 req->status = status;
106 req->completed = true;
107 if (status == UCS_OK) {
108 req->info = *info;
109 }
110 }
111
wait(request * req,int buf_index)112 void test_ucp_tag::wait(request *req, int buf_index)
113 {
114 int worker_index = get_worker_index(buf_index);
115
116 if (is_external_request()) {
117 ucp_tag_recv_info_t tag_info;
118 ucs_status_t status = ucp_request_test(req, &tag_info);
119
120 while (status == UCS_INPROGRESS) {
121 progress(worker_index);
122 status = ucp_request_test(req, &tag_info);
123 }
124 if (req->external) {
125 recv_callback(req, status, &tag_info);
126 }
127 } else {
128 while (!req->completed) {
129 progress(worker_index);
130 if ((req->external) &&
131 (ucp_request_check_status(req) == UCS_OK)) {
132 return;
133 }
134 }
135 }
136 }
137
wait_and_validate(request * req)138 void test_ucp_tag::wait_and_validate(request *req)
139 {
140 if (req == NULL) {
141 return;
142 }
143
144 wait(req);
145 EXPECT_TRUE(req->completed);
146 EXPECT_EQ(UCS_OK, req->status);
147 request_release(req);
148 }
149
wait_for_unexpected_msg(ucp_worker_h worker,double sec)150 void test_ucp_tag::wait_for_unexpected_msg(ucp_worker_h worker, double sec)
151 {
152 /* Wait for some message to be added to unexpected queue */
153 ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(sec);
154
155 do {
156 short_progress_loop();
157 } while (ucp_tag_unexp_is_empty(&worker->tm) && (ucs_get_time() < timeout));
158 }
159
check_offload_support(bool offload_required)160 void test_ucp_tag::check_offload_support(bool offload_required)
161 {
162 bool offload_supported = ucp_ep_is_tag_offload_enabled(ucp_ep_config(sender().ep()));
163 if (offload_supported != offload_required) {
164 cleanup();
165 std::string reason = offload_supported ? "tag offload" : "no tag offload";
166 UCS_TEST_SKIP_R(reason);
167 }
168 }
169
get_worker_index(int buf_index)170 int test_ucp_tag::get_worker_index(int buf_index)
171 {
172 int worker_index = 0;
173 if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
174 worker_index = buf_index;
175 } else if (GetParam().thread_type == SINGLE_THREAD) {
176 ucs_assert((buf_index == 0) && (worker_index == 0));
177 }
178 return worker_index;
179 }
180
181 test_ucp_tag::request *
send(entity & sender,send_type_t type,const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)182 test_ucp_tag::send(entity &sender, send_type_t type, const void *buffer,
183 size_t count, ucp_datatype_t datatype, ucp_tag_t tag,
184 int buf_index)
185 {
186 int worker_index = get_worker_index(buf_index);
187 request *req;
188 ucs_status_t status;
189
190 switch (type) {
191 case SEND_B:
192 case SEND_NB:
193 req = (request*)ucp_tag_send_nb(sender.ep(worker_index), buffer, count,
194 datatype, tag, send_callback);
195 if ((req != NULL) && (type == SEND_B)) {
196 wait(req, get_worker_index(buf_index));
197 request_release(req);
198 return NULL;
199 }
200
201 if (UCS_PTR_IS_ERR(req)) {
202 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
203 }
204 break;
205 case SEND_NBR:
206 req = request_alloc();
207 status = ucp_tag_send_nbr(sender.ep(worker_index), buffer, count,
208 datatype, tag, req);
209 ASSERT_UCS_OK_OR_INPROGRESS(status);
210 if (status == UCS_OK) {
211 request_free(req);
212 return (request*)UCS_STATUS_PTR(UCS_OK);
213 }
214 break;
215 case SEND_SYNC_NB:
216 return (request*)ucp_tag_send_sync_nb(sender.ep(worker_index), buffer,
217 count, datatype, tag, send_callback);
218 default:
219 return NULL;
220 }
221
222 return req;
223 }
224
225 test_ucp_tag::request *
send_nb(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)226 test_ucp_tag::send_nb(const void *buffer, size_t count, ucp_datatype_t datatype,
227 ucp_tag_t tag, int buf_index)
228 {
229 return send(sender(), SEND_NB, buffer, count, datatype, tag, buf_index);
230 }
231
232 test_ucp_tag::request *
send_nbr(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)233 test_ucp_tag::send_nbr(const void *buffer, size_t count,
234 ucp_datatype_t datatype,
235 ucp_tag_t tag, int buf_index)
236 {
237 return send(sender(), SEND_NBR, buffer, count, datatype, tag, buf_index);
238 }
239
240
send_b(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)241 void test_ucp_tag::send_b(const void *buffer, size_t count, ucp_datatype_t datatype,
242 ucp_tag_t tag, int buf_index)
243 {
244 send(sender(), SEND_B, buffer, count, datatype, tag, buf_index);
245 }
246
247 test_ucp_tag::request *
send_sync_nb(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)248 test_ucp_tag::send_sync_nb(const void *buffer, size_t count, ucp_datatype_t datatype,
249 ucp_tag_t tag, int buf_index)
250 {
251 return send(sender(), SEND_SYNC_NB, buffer, count, datatype, tag, buf_index);
252 }
253
254 test_ucp_tag::request*
recv(entity & receiver,recv_type_t type,void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,ucp_tag_t tag_mask,ucp_tag_recv_info_t * info,int buf_index)255 test_ucp_tag::recv(entity &receiver, recv_type_t type, void *buffer,
256 size_t count, ucp_datatype_t datatype,
257 ucp_tag_t tag, ucp_tag_t tag_mask,
258 ucp_tag_recv_info_t *info, int buf_index)
259 {
260 int worker_index = get_worker_index(buf_index);
261 request *req;
262 ucs_status_t status;
263
264 switch (type) {
265 case RECV_B:
266 case RECV_NB:
267 req = (request*)ucp_tag_recv_nb(receiver.worker(worker_index), buffer, count,
268 datatype, tag, tag_mask, recv_callback);
269 if (type == RECV_NB) {
270 if (UCS_PTR_IS_ERR(req)) {
271 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
272 } else if (req == NULL) {
273 UCS_TEST_ABORT("ucp_tag_recv_nb returned NULL");
274 }
275 } else {
276 if (UCS_PTR_IS_ERR(req)) {
277 return req;
278 } else if (req == NULL) {
279 UCS_TEST_ABORT("ucp_tag_recv_nb returned NULL");
280 } else {
281 wait(req, worker_index);
282 status = req->status;
283 *info = req->info;
284 request_release(req);
285 return (request*)UCS_STATUS_PTR(status);
286 }
287 }
288 break;
289 case RECV_BR:
290 case RECV_NBR:
291 req = request_alloc();
292 status = ucp_tag_recv_nbr(receiver.worker(worker_index), buffer,
293 count, datatype, tag, tag_mask, req);
294 if (type == RECV_NBR) {
295 if (UCS_STATUS_IS_ERR(status)) {
296 UCS_TEST_ABORT("ucp_tag_recv_nb returned status " <<
297 ucs_status_string(status));
298 }
299 } else {
300 if (!UCS_STATUS_IS_ERR(status)) {
301 wait(req, worker_index);
302 status = req->status;
303 *info = req->info;
304 request_release(req);
305 return (request*)UCS_STATUS_PTR(status);
306 }
307 }
308 break;
309 default:
310 return NULL;
311 }
312
313 return req;
314 }
315
316 test_ucp_tag::request*
recv_nb(void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,ucp_tag_t tag_mask,int buf_index)317 test_ucp_tag::recv_nb(void *buffer, size_t count, ucp_datatype_t datatype,
318 ucp_tag_t tag, ucp_tag_t tag_mask, int buf_index)
319 {
320 recv_type_t type = is_external_request() ? RECV_NBR : RECV_NB;
321 return recv(receiver(), type, buffer, count, datatype,
322 tag, tag_mask, NULL, buf_index);
323 }
324
325 ucs_status_t
recv_b(void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,ucp_tag_t tag_mask,ucp_tag_recv_info_t * info,int buf_index)326 test_ucp_tag::recv_b(void *buffer, size_t count, ucp_datatype_t datatype,
327 ucp_tag_t tag, ucp_tag_t tag_mask,
328 ucp_tag_recv_info_t *info, int buf_index)
329 {
330 recv_type_t type = is_external_request() ? RECV_BR : RECV_B;
331 request* req = recv(receiver(), type, buffer, count, datatype,
332 tag, tag_mask, info, buf_index);
333 return UCS_PTR_STATUS(req);
334 }
335
is_external_request()336 bool test_ucp_tag::is_external_request()
337 {
338 return false;
339 }
340
341 ucp_context_attr_t test_ucp_tag::ctx_attr;
342
343
344 class test_ucp_tag_limits : public test_ucp_tag {
345 public:
test_ucp_tag_limits()346 test_ucp_tag_limits() {
347 m_test_offload = GetParam().variant;
348 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE",
349 ucs::to_string(m_test_offload).c_str()));
350 }
351
init()352 void init() {
353 test_ucp_tag::init();
354 check_offload_support(m_test_offload);
355 }
356
357 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)358 static enum_test_params(const ucp_params_t& ctx_params,
359 const std::string& name,
360 const std::string& test_case_name,
361 const std::string& tls)
362 {
363 std::vector<ucp_test_param> result;
364 generate_test_params_variant(ctx_params, name, test_case_name,
365 tls, false, result);
366 generate_test_params_variant(ctx_params, name, test_case_name + "/offload",
367 tls, true, result);
368 return result;
369 }
370
371 protected:
372 bool m_test_offload;
373 };
374
375 UCS_TEST_P(test_ucp_tag_limits, check_max_short_rndv_thresh_zero, "RNDV_THRESH=0") {
376 size_t max_short =
377 static_cast<size_t>(ucp_ep_config(sender().ep())->tag.eager.max_short + 1);
378
379 // (maximal short + 1) <= RNDV thresh
380 EXPECT_LE(max_short,
381 ucp_ep_config(sender().ep())->tag.rndv.am_thresh);
382 EXPECT_LE(max_short,
383 ucp_ep_config(sender().ep())->tag.rndv.rma_thresh);
384
385 // (maximal short + 1) <= RNDV send_nbr thresh
386 EXPECT_LE(max_short,
387 ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
388 EXPECT_LE(max_short,
389 ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
390
391 if (m_test_offload) {
392 // There is a lower bound for rndv threshold with tag offload. We should
393 // not send messages smaller than SW RNDV request size, because receiver
394 // may temporarily store this request in the user buffer (which will
395 // result in crash if the request does not fit user buffer).
396 size_t min_rndv = ucp_ep_tag_offload_min_rndv_thresh(ucp_ep_config(sender().ep()));
397
398 EXPECT_GT(min_rndv, 0ul); // min_rndv should be RTS size at least
399 EXPECT_GE(min_rndv,
400 ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
401 EXPECT_GE(min_rndv,
402 ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
403 }
404 }
405
406 UCS_TEST_P(test_ucp_tag_limits, check_max_short_zcopy_thresh_zero, "ZCOPY_THRESH=0") {
407 size_t max_short =
408 static_cast<size_t>(ucp_ep_config(sender().ep())->tag.eager.max_short + 1);
409
410 // (maximal short + 1) <= ZCOPY thresh
411 EXPECT_LE(max_short,
412 ucp_ep_config(sender().ep())->tag.eager.zcopy_thresh[0]);
413 }
414
415 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_limits)
416