1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7 
8 #include "test_ucp_tag.h"
9 
10 #include "ucp_datatype.h"
11 
12 extern "C" {
13 #include <ucp/core/ucp_worker.h>
14 #include <ucp/core/ucp_ep.h>
15 #include <ucp/core/ucp_ep.inl>
16 }
17 
18 
get_ctx_params()19 ucp_params_t test_ucp_tag::get_ctx_params() {
20     ucp_params_t params = ucp_test::get_ctx_params();
21     params.field_mask  |= UCP_PARAM_FIELD_FEATURES |
22                           UCP_PARAM_FIELD_REQUEST_INIT |
23                           UCP_PARAM_FIELD_REQUEST_SIZE;
24     params.features     = UCP_FEATURE_TAG;
25     params.request_size = sizeof(request);
26     params.request_init = request_init;
27     return params;
28 }
29 
init()30 void test_ucp_tag::init()
31 {
32     ucp_test::init();
33     sender().connect(&receiver(), get_ep_params());
34 
35     ctx_attr.field_mask = 0;
36     ctx_attr.field_mask |= UCP_ATTR_FIELD_REQUEST_SIZE;
37     ctx_attr.field_mask |= UCP_ATTR_FIELD_THREAD_MODE;
38     ucp_context_query(receiver().ucph(), &ctx_attr);
39 
40     ucp::dt_gen_start_count  = 0;
41     ucp::dt_gen_finish_count = 0;
42 }
43 
enable_tag_mp_offload()44 void test_ucp_tag::enable_tag_mp_offload()
45 {
46     m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
47     m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_MP_SRQ_ENABLE", "try"));
48     m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_MP_NUM_STRIDES", "8"));
49     m_env.push_back(new ucs::scoped_setenv("UCX_IB_MLX5_DEVX_OBJECTS",
50                                            "dct,dcsrq,rcsrq,rcqp"));
51 }
52 
request_init(void * request)53 void test_ucp_tag::request_init(void *request)
54 {
55     struct request *req = (struct request *)request;
56     req->completed       = false;
57     req->external        = false;
58     req->info.length     = 0;
59     req->info.sender_tag = 0;
60 }
61 
request_release(struct request * req)62 void test_ucp_tag::request_release(struct request *req)
63 {
64     if (req->external) {
65         free(req->req_mem);
66     } else {
67         req->completed = false;
68         ucp_request_release(req);
69     }
70 }
71 
request_free(struct request * req)72 void test_ucp_tag::request_free(struct request *req)
73 {
74     if (req->external) {
75         free(req->req_mem);
76     } else {
77         req->completed = false;
78         ucp_request_free(req);
79     }
80 }
81 
request_alloc()82 test_ucp_tag::request* test_ucp_tag::request_alloc()
83 {
84     void *mem = malloc(ctx_attr.request_size + sizeof(request));
85     request *req = (request*)((char*)mem + ctx_attr.request_size);
86     request_init(req);
87     req->external = true;
88     req->req_mem = mem;
89     return req;
90 }
91 
send_callback(void * request,ucs_status_t status)92 void test_ucp_tag::send_callback(void *request, ucs_status_t status)
93 {
94     struct request *req = (struct request *)request;
95     ucs_assert(req->completed == false);
96     req->status    = status;
97     req->completed = true;
98 }
99 
recv_callback(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)100 void test_ucp_tag::recv_callback(void *request, ucs_status_t status,
101                                  ucp_tag_recv_info_t *info)
102 {
103     struct request *req = (struct request *)request;
104     ucs_assert(req->completed == false);
105     req->status    = status;
106     req->completed = true;
107     if (status == UCS_OK) {
108         req->info      = *info;
109     }
110 }
111 
wait(request * req,int buf_index)112 void test_ucp_tag::wait(request *req, int buf_index)
113 {
114     int worker_index = get_worker_index(buf_index);
115 
116     if (is_external_request()) {
117         ucp_tag_recv_info_t tag_info;
118         ucs_status_t        status = ucp_request_test(req, &tag_info);
119 
120         while (status == UCS_INPROGRESS) {
121             progress(worker_index);
122             status = ucp_request_test(req, &tag_info);
123         }
124         if (req->external) {
125             recv_callback(req, status, &tag_info);
126         }
127     } else {
128         while (!req->completed) {
129             progress(worker_index);
130             if ((req->external) &&
131                 (ucp_request_check_status(req) == UCS_OK)) {
132                 return;
133             }
134         }
135     }
136 }
137 
wait_and_validate(request * req)138 void test_ucp_tag::wait_and_validate(request *req)
139 {
140     if (req == NULL) {
141         return;
142     }
143 
144     wait(req);
145     EXPECT_TRUE(req->completed);
146     EXPECT_EQ(UCS_OK, req->status);
147     request_release(req);
148 }
149 
wait_for_unexpected_msg(ucp_worker_h worker,double sec)150 void test_ucp_tag::wait_for_unexpected_msg(ucp_worker_h worker, double sec)
151 {
152     /* Wait for some message to be added to unexpected queue */
153     ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(sec);
154 
155     do {
156         short_progress_loop();
157     } while (ucp_tag_unexp_is_empty(&worker->tm) && (ucs_get_time() < timeout));
158 }
159 
check_offload_support(bool offload_required)160 void test_ucp_tag::check_offload_support(bool offload_required)
161 {
162     bool offload_supported = ucp_ep_is_tag_offload_enabled(ucp_ep_config(sender().ep()));
163     if (offload_supported != offload_required) {
164         cleanup();
165         std::string reason = offload_supported ? "tag offload" : "no tag offload";
166         UCS_TEST_SKIP_R(reason);
167     }
168 }
169 
get_worker_index(int buf_index)170 int test_ucp_tag::get_worker_index(int buf_index)
171 {
172     int worker_index = 0;
173     if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
174         worker_index = buf_index;
175     } else if (GetParam().thread_type == SINGLE_THREAD) {
176         ucs_assert((buf_index == 0) && (worker_index == 0));
177     }
178     return worker_index;
179 }
180 
181 test_ucp_tag::request *
send(entity & sender,send_type_t type,const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)182 test_ucp_tag::send(entity &sender, send_type_t type, const void *buffer,
183                    size_t count, ucp_datatype_t datatype, ucp_tag_t tag,
184                    int buf_index)
185 {
186     int worker_index = get_worker_index(buf_index);
187     request *req;
188     ucs_status_t status;
189 
190     switch (type) {
191     case SEND_B:
192     case SEND_NB:
193         req = (request*)ucp_tag_send_nb(sender.ep(worker_index), buffer, count,
194                                         datatype, tag, send_callback);
195         if ((req != NULL) && (type == SEND_B)) {
196             wait(req, get_worker_index(buf_index));
197             request_release(req);
198             return NULL;
199         }
200 
201         if (UCS_PTR_IS_ERR(req)) {
202             ASSERT_UCS_OK(UCS_PTR_STATUS(req));
203         }
204         break;
205     case SEND_NBR:
206         req = request_alloc();
207         status = ucp_tag_send_nbr(sender.ep(worker_index), buffer, count,
208                                   datatype, tag, req);
209         ASSERT_UCS_OK_OR_INPROGRESS(status);
210         if (status == UCS_OK) {
211             request_free(req);
212             return (request*)UCS_STATUS_PTR(UCS_OK);
213         }
214         break;
215     case SEND_SYNC_NB:
216         return (request*)ucp_tag_send_sync_nb(sender.ep(worker_index), buffer,
217                                               count, datatype, tag, send_callback);
218     default:
219         return NULL;
220     }
221 
222     return req;
223 }
224 
225 test_ucp_tag::request *
send_nb(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)226 test_ucp_tag::send_nb(const void *buffer, size_t count, ucp_datatype_t datatype,
227                       ucp_tag_t tag, int buf_index)
228 {
229     return send(sender(), SEND_NB, buffer, count, datatype, tag, buf_index);
230 }
231 
232 test_ucp_tag::request *
send_nbr(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)233 test_ucp_tag::send_nbr(const void *buffer, size_t count,
234                        ucp_datatype_t datatype,
235                        ucp_tag_t tag, int buf_index)
236 {
237     return send(sender(), SEND_NBR, buffer, count, datatype, tag, buf_index);
238 }
239 
240 
send_b(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)241 void test_ucp_tag::send_b(const void *buffer, size_t count, ucp_datatype_t datatype,
242                           ucp_tag_t tag, int buf_index)
243 {
244     send(sender(), SEND_B, buffer, count, datatype, tag, buf_index);
245 }
246 
247 test_ucp_tag::request *
send_sync_nb(const void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,int buf_index)248 test_ucp_tag::send_sync_nb(const void *buffer, size_t count, ucp_datatype_t datatype,
249                            ucp_tag_t tag, int buf_index)
250 {
251     return send(sender(), SEND_SYNC_NB, buffer, count, datatype, tag, buf_index);
252 }
253 
254 test_ucp_tag::request*
recv(entity & receiver,recv_type_t type,void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,ucp_tag_t tag_mask,ucp_tag_recv_info_t * info,int buf_index)255 test_ucp_tag::recv(entity &receiver, recv_type_t type, void *buffer,
256                    size_t count, ucp_datatype_t datatype,
257                    ucp_tag_t tag, ucp_tag_t tag_mask,
258                    ucp_tag_recv_info_t *info, int buf_index)
259 {
260     int worker_index = get_worker_index(buf_index);
261     request *req;
262     ucs_status_t status;
263 
264     switch (type) {
265     case RECV_B:
266     case RECV_NB:
267         req = (request*)ucp_tag_recv_nb(receiver.worker(worker_index), buffer, count,
268                                         datatype, tag, tag_mask, recv_callback);
269         if (type == RECV_NB) {
270             if (UCS_PTR_IS_ERR(req)) {
271                 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
272             } else if (req == NULL) {
273                 UCS_TEST_ABORT("ucp_tag_recv_nb returned NULL");
274             }
275         } else {
276             if (UCS_PTR_IS_ERR(req)) {
277                 return req;
278             } else if (req == NULL) {
279                 UCS_TEST_ABORT("ucp_tag_recv_nb returned NULL");
280             } else {
281                 wait(req, worker_index);
282                 status = req->status;
283                 *info  = req->info;
284                 request_release(req);
285                 return (request*)UCS_STATUS_PTR(status);
286             }
287         }
288         break;
289     case RECV_BR:
290     case RECV_NBR:
291         req = request_alloc();
292         status = ucp_tag_recv_nbr(receiver.worker(worker_index), buffer,
293                                   count, datatype, tag, tag_mask, req);
294         if (type == RECV_NBR) {
295             if (UCS_STATUS_IS_ERR(status)) {
296                 UCS_TEST_ABORT("ucp_tag_recv_nb returned status " <<
297                                ucs_status_string(status));
298             }
299         } else {
300             if (!UCS_STATUS_IS_ERR(status)) {
301                 wait(req, worker_index);
302                 status = req->status;
303                 *info  = req->info;
304                 request_release(req);
305                 return (request*)UCS_STATUS_PTR(status);
306             }
307         }
308         break;
309     default:
310         return NULL;
311     }
312 
313     return req;
314 }
315 
316 test_ucp_tag::request*
recv_nb(void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,ucp_tag_t tag_mask,int buf_index)317 test_ucp_tag::recv_nb(void *buffer, size_t count, ucp_datatype_t datatype,
318                       ucp_tag_t tag, ucp_tag_t tag_mask, int buf_index)
319 {
320     recv_type_t type = is_external_request() ? RECV_NBR : RECV_NB;
321     return recv(receiver(), type, buffer, count, datatype,
322                 tag, tag_mask, NULL, buf_index);
323 }
324 
325 ucs_status_t
recv_b(void * buffer,size_t count,ucp_datatype_t datatype,ucp_tag_t tag,ucp_tag_t tag_mask,ucp_tag_recv_info_t * info,int buf_index)326 test_ucp_tag::recv_b(void *buffer, size_t count, ucp_datatype_t datatype,
327                      ucp_tag_t tag, ucp_tag_t tag_mask,
328                      ucp_tag_recv_info_t *info, int buf_index)
329 {
330     recv_type_t type = is_external_request() ? RECV_BR : RECV_B;
331     request* req = recv(receiver(), type, buffer, count, datatype,
332                         tag, tag_mask, info, buf_index);
333     return UCS_PTR_STATUS(req);
334 }
335 
is_external_request()336 bool test_ucp_tag::is_external_request()
337 {
338     return false;
339 }
340 
341 ucp_context_attr_t test_ucp_tag::ctx_attr;
342 
343 
344 class test_ucp_tag_limits : public test_ucp_tag {
345 public:
test_ucp_tag_limits()346     test_ucp_tag_limits() {
347         m_test_offload = GetParam().variant;
348         m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE",
349                                                ucs::to_string(m_test_offload).c_str()));
350     }
351 
init()352     void init() {
353         test_ucp_tag::init();
354         check_offload_support(m_test_offload);
355     }
356 
357     std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)358     static enum_test_params(const ucp_params_t& ctx_params,
359                             const std::string& name,
360                             const std::string& test_case_name,
361                             const std::string& tls)
362     {
363         std::vector<ucp_test_param> result;
364         generate_test_params_variant(ctx_params, name, test_case_name,
365                                      tls, false, result);
366         generate_test_params_variant(ctx_params, name, test_case_name + "/offload",
367                                      tls, true, result);
368         return result;
369     }
370 
371 protected:
372     bool m_test_offload;
373 };
374 
375 UCS_TEST_P(test_ucp_tag_limits, check_max_short_rndv_thresh_zero, "RNDV_THRESH=0") {
376     size_t max_short =
377         static_cast<size_t>(ucp_ep_config(sender().ep())->tag.eager.max_short + 1);
378 
379     // (maximal short + 1) <= RNDV thresh
380     EXPECT_LE(max_short,
381               ucp_ep_config(sender().ep())->tag.rndv.am_thresh);
382     EXPECT_LE(max_short,
383               ucp_ep_config(sender().ep())->tag.rndv.rma_thresh);
384 
385     // (maximal short + 1) <= RNDV send_nbr thresh
386     EXPECT_LE(max_short,
387               ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
388     EXPECT_LE(max_short,
389               ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
390 
391     if (m_test_offload) {
392         // There is a lower bound for rndv threshold with tag offload. We should
393         // not send messages smaller than SW RNDV request size, because receiver
394         // may temporarily store this request in the user buffer (which will
395         // result in crash if the request does not fit user buffer).
396         size_t min_rndv = ucp_ep_tag_offload_min_rndv_thresh(ucp_ep_config(sender().ep()));
397 
398         EXPECT_GT(min_rndv, 0ul); // min_rndv should be RTS size at least
399         EXPECT_GE(min_rndv,
400                   ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
401         EXPECT_GE(min_rndv,
402                   ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
403     }
404 }
405 
406 UCS_TEST_P(test_ucp_tag_limits, check_max_short_zcopy_thresh_zero, "ZCOPY_THRESH=0") {
407     size_t max_short =
408         static_cast<size_t>(ucp_ep_config(sender().ep())->tag.eager.max_short + 1);
409 
410     // (maximal short + 1) <= ZCOPY thresh
411     EXPECT_LE(max_short,
412               ucp_ep_config(sender().ep())->tag.eager.zcopy_thresh[0]);
413 }
414 
415 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_limits)
416