1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2017-2019. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "test_ucp_tag.h"
8
9 #include "ucp_datatype.h"
10
11 extern "C" {
12 #include <ucp/core/ucp_ep.inl>
13 #include <ucp/core/ucp_worker.h>
14 #include <ucp/tag/tag_match.h>
15 }
16
17 #define UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(_test_case) \
18 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, dcx, "dc_x") \
19 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, rcx, "rc_x")
20
21 class test_ucp_tag_offload : public test_ucp_tag {
22 public:
test_ucp_tag_offload()23 test_ucp_tag_offload() {
24 // TODO: test offload and offload MP as different variants
25 enable_tag_mp_offload();
26 }
27
init()28 void init()
29 {
30 test_ucp_tag::init();
31 check_offload_support(true);
32 }
33
recv_nb_and_check(void * buffer,size_t count,ucp_datatype_t dt,ucp_tag_t tag,ucp_tag_t tag_mask)34 request* recv_nb_and_check(void *buffer, size_t count, ucp_datatype_t dt,
35 ucp_tag_t tag, ucp_tag_t tag_mask)
36 {
37 request *req = recv_nb(buffer, count, dt, tag, tag_mask);
38 EXPECT_TRUE(!UCS_PTR_IS_ERR(req));
39 EXPECT_TRUE(req != NULL);
40 return req;
41 }
42
recv_nb_exp(void * buffer,size_t count,ucp_datatype_t dt,ucp_tag_t tag,ucp_tag_t tag_mask)43 request* recv_nb_exp(void *buffer, size_t count, ucp_datatype_t dt,
44 ucp_tag_t tag, ucp_tag_t tag_mask)
45 {
46 request *req1 = recv_nb_and_check(buffer, count, DATATYPE, tag,
47 UCP_TAG_MASK_FULL);
48
49 // Post and cancel another receive to make sure the first one was offloaded
50 size_t size = receiver().worker()->context->config.ext.tm_thresh + 1;
51 std::vector<char> tbuf(size, 0);
52 request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag,
53 UCP_TAG_MASK_FULL);
54 req_cancel(receiver(), req2);
55
56 return req1;
57 }
58
send_recv(entity & se,ucp_tag_t tag,size_t length)59 void send_recv(entity &se, ucp_tag_t tag, size_t length)
60 {
61 std::vector<uint8_t> sendbuf(length);
62 std::vector<uint8_t> recvbuf(length);
63
64 request *rreq = recv_nb_exp(&recvbuf[0], length, DATATYPE, tag,
65 UCP_TAG_MASK_FULL);
66
67 request *sreq = (request*)ucp_tag_send_nb(se.ep(), &sendbuf[0], length,
68 DATATYPE, tag, send_callback);
69 if (UCS_PTR_IS_ERR(sreq)) {
70 ASSERT_UCS_OK(UCS_PTR_STATUS(sreq));
71 } else if (sreq != NULL) {
72 wait(sreq);
73 request_free(sreq);
74 }
75
76 wait(rreq);
77 request_free(rreq);
78 }
79
activate_offload(entity & se,ucp_tag_t tag=0x11)80 void activate_offload(entity &se, ucp_tag_t tag = 0x11)
81 {
82 send_recv(se, tag, receiver().worker()->context->config.ext.tm_thresh);
83 }
84
req_cancel(entity & e,request * req)85 void req_cancel(entity &e, request *req)
86 {
87 ucp_request_cancel(e.worker(), req);
88 wait(req);
89 request_free(req);
90 }
91 };
92
UCS_TEST_P(test_ucp_tag_offload,post_after_cancel)93 UCS_TEST_P(test_ucp_tag_offload, post_after_cancel)
94 {
95 uint64_t small_val = 0xFAFA;
96 ucp_tag_t tag = 0x11;
97 std::vector<char> recvbuf(2048, 0);
98
99 activate_offload(sender());
100
101 request *req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
102 tag, UCP_TAG_MASK_FULL);
103
104 EXPECT_EQ(1u, receiver().worker()->tm.expected.sw_all_count);
105 req_cancel(receiver(), req);
106 EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
107
108 req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag,
109 UCP_TAG_MASK_FULL);
110
111 EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
112 req_cancel(receiver(), req);
113 }
114
UCS_TEST_P(test_ucp_tag_offload,post_after_comp)115 UCS_TEST_P(test_ucp_tag_offload, post_after_comp)
116 {
117 uint64_t small_val = 0xFAFA;
118 ucp_tag_t tag = 0x11;
119 std::vector<char> recvbuf(2048, 0);
120
121 activate_offload(sender());
122
123 request *req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
124 tag, UCP_TAG_MASK_FULL);
125
126 EXPECT_EQ(1u, receiver().worker()->tm.expected.sw_all_count);
127
128 send_b(&small_val, sizeof(small_val), DATATYPE, tag);
129 wait(req);
130 request_free(req);
131 EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
132
133 req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag,
134 UCP_TAG_MASK_FULL);
135
136 EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
137 req_cancel(receiver(), req);
138 }
139
UCS_TEST_P(test_ucp_tag_offload,post_wild)140 UCS_TEST_P(test_ucp_tag_offload, post_wild)
141 {
142 uint64_t small_val = 0xFAFA;
143 ucp_tag_t tag1 = 0x11; // these two tags should go to different
144 ucp_tag_t tag2 = 0x13; // hash buckets in the TM expected queue
145 std::vector<char> recvbuf(2048, 0);
146
147 activate_offload(sender());
148
149 request *req1 = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
150 tag1, 0);
151 EXPECT_EQ(1u, receiver().worker()->tm.expected.sw_all_count);
152
153 request *req2 = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag2,
154 UCP_TAG_MASK_FULL);
155 // Second request should not be posted as well. Even though it has another
156 // tag, the first request is a wildcard, which needs to be handled in SW,
157 // so it blocks all other requests
158 EXPECT_EQ(2u, receiver().worker()->tm.expected.sw_all_count);
159 req_cancel(receiver(), req1);
160 req_cancel(receiver(), req2);
161 }
162
UCS_TEST_P(test_ucp_tag_offload,post_dif_buckets)163 UCS_TEST_P(test_ucp_tag_offload, post_dif_buckets)
164 {
165 uint64_t small_val = 0xFAFA;
166 ucp_tag_t tag1 = 0x11; // these two tags should go to different
167 ucp_tag_t tag2 = 0x13; // hash buckets in the TM expected queue
168 std::vector<request*> reqs;
169 request *req;
170
171 std::vector<char> recvbuf(2048, 0);
172
173 activate_offload(sender());
174
175 req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE, tag1,
176 UCP_TAG_MASK_FULL);
177 reqs.push_back(req);
178
179 req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag1,
180 UCP_TAG_MASK_FULL);
181 reqs.push_back(req);
182
183 // The first request was not offloaded due to small size and the second
184 // is blocked by the first one.
185 EXPECT_EQ(2u, receiver().worker()->tm.expected.sw_all_count);
186
187 req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag2,
188 UCP_TAG_MASK_FULL);
189 reqs.push_back(req);
190
191 // Check that another request with different tag is offloaded.
192 EXPECT_EQ(2u, receiver().worker()->tm.expected.sw_all_count);
193
194 for (std::vector<request*>::const_iterator iter = reqs.begin();
195 iter != reqs.end(); ++iter) {
196 req_cancel(receiver(), *iter);
197 }
198 }
199
200 UCS_TEST_P(test_ucp_tag_offload, force_thresh_basic, "TM_FORCE_THRESH=4k",
201 "TM_THRESH=1k")
202 {
203 uint64_t small_val = 0xFAFA;
204 const size_t big_size = 5000;
205 int num_reqs = 8;
206 int tag = 0x11;
207 std::vector<request*> reqs;
208 request *req;
209
210 activate_offload(sender());
211
212 for (int i = 0; i < num_reqs - 1; ++i) {
213 req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
214 tag, UCP_TAG_MASK_FULL);
215 reqs.push_back(req);
216 }
217
218 // No requests should be posted to the transport, because their sizes less
219 // than TM_THRESH
220 EXPECT_EQ((unsigned)(num_reqs - 1), receiver().worker()->tm.expected.sw_all_count);
221
222 std::vector<char> recvbuf_big(big_size, 0);
223
224 req = recv_nb(&recvbuf_big[0], recvbuf_big.size(), DATATYPE, tag,
225 UCP_TAG_MASK_FULL);
226 reqs.push_back(req);
227
228 // Now, all requests should be posted to the transport, because receive
229 // buffer bigger than FORCE_THRESH has been posted
230 EXPECT_EQ((unsigned)0, receiver().worker()->tm.expected.sw_all_count);
231
232 std::vector<request*>::const_iterator iter;
233 for (iter = reqs.begin(); iter != reqs.end(); ++iter) {
234 req_cancel(receiver(), *iter);
235 }
236 }
237
238 UCS_TEST_P(test_ucp_tag_offload, force_thresh_blocked, "TM_FORCE_THRESH=4k",
239 "TM_THRESH=1k")
240 {
241 uint64_t small_val = 0xFAFA;
242 const size_t big_size = 5000;
243 int num_reqs = 8;
244 int tag = 0x11;
245 std::vector<request*> reqs;
246 request *req;
247 int i;
248
249 activate_offload(sender());
250
251 for (i = 0; i < num_reqs - 3; ++i) {
252 req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
253 tag, UCP_TAG_MASK_FULL);
254 reqs.push_back(req);
255 }
256
257 // Add request with noncontig dt
258 std::vector<char> buf(64, 0);
259 ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), buf.size(), 1);
260 req = recv_nb_and_check(dt_desc.buf(), dt_desc.count(), dt_desc.dt(),
261 tag, UCP_TAG_MASK_FULL);
262 reqs.push_back(req);
263
264 // Add request with wildcard tag
265 req = recv_nb(&small_val, sizeof(small_val), DATATYPE, tag, 0);
266 reqs.push_back(req);
267
268 std::vector<char> recvbuf_big(big_size, 0);
269 // Check that offload is not forced while there are uncompleted blocking
270 // SW requests with the same tag
271 for (i = 0; i < 2; ++i) {
272 req = recv_nb_and_check(&recvbuf_big[0], recvbuf_big.size(), DATATYPE, tag,
273 UCP_TAG_MASK_FULL);
274 EXPECT_EQ((unsigned)(num_reqs - i), receiver().worker()->tm.expected.sw_all_count);
275 req_cancel(receiver(), req);
276
277 req_cancel(receiver(), reqs.back());
278 reqs.pop_back();
279 }
280
281 req = recv_nb(&recvbuf_big[0], recvbuf_big.size(), DATATYPE, tag,
282 UCP_TAG_MASK_FULL);
283 reqs.push_back(req);
284
285 // Now, all requests should be posted to the transport, because receive
286 // buffer bigger than FORCE_THRESH has been posted
287 EXPECT_EQ((unsigned)0, receiver().worker()->tm.expected.sw_all_count);
288
289 std::vector<request*>::const_iterator iter;
290 for (iter = reqs.begin(); iter != reqs.end(); ++iter) {
291 req_cancel(receiver(), *iter);
292 }
293 }
294
295 // Check that worker will not try to connect tag offload capable iface with
296 // the peer which does not support tag offload (e.g CX-5 and CX-4). In this
297 // case connection attempt should fail (due to peer unreachable) or some other
298 // transport should be selected (if available). Otherwise connect can hang,
299 // because some transports (e.g. rcx) have different ep address type for
300 // interfaces which support tag_offload.
UCS_TEST_P(test_ucp_tag_offload,connect)301 UCS_TEST_P(test_ucp_tag_offload, connect)
302 {
303 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "n"));
304
305 entity *e = create_entity(true);
306 // Should be:
307 // - either complete ok
308 // - or force skipping the test (because peer is unreachable)
309 e->connect(&receiver(), get_ep_params());
310 }
311
312 UCS_TEST_P(test_ucp_tag_offload, small_rndv, "RNDV_THRESH=0", "TM_THRESH=0")
313 {
314 activate_offload(sender());
315 send_recv(sender(), 0x11ul, 0ul);
316 send_recv(sender(), 0x11ul, 1ul);
317 }
318
319 UCS_TEST_P(test_ucp_tag_offload, small_sw_rndv, "RNDV_THRESH=0", "TM_THRESH=0",
320 "TM_SW_RNDV=y")
321 {
322 activate_offload(sender());
323 send_recv(sender(), 0x11ul, 0ul);
324 send_recv(sender(), 0x11ul, 1ul);
325 }
326
327 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload)
328
329
330 class test_ucp_tag_offload_multi : public test_ucp_tag_offload {
331 public:
332
get_ctx_params()333 static ucp_params_t get_ctx_params()
334 {
335 ucp_params_t params = test_ucp_tag::get_ctx_params();
336 params.field_mask |= UCP_PARAM_FIELD_TAG_SENDER_MASK;
337 params.tag_sender_mask = TAG_SENDER;
338 return params;
339 }
340
init()341 void init()
342 {
343 // The test checks that increase of active ifaces is handled
344 // correctly. It needs to start with a single active iface, therefore
345 // disable multi-rail.
346 modify_config("MAX_EAGER_LANES", "1");
347 modify_config("MAX_RNDV_LANES", "1");
348
349 test_ucp_tag_offload::init();
350
351 // TODO: add more tls which support tag offloading
352 std::vector<std::string> tls;
353 tls.push_back("dc_x");
354 tls.push_back("rc_x");
355 ucp_test_param params = GetParam();
356
357 // Create new entity and add to to the end of vector
358 // (thus it will be receiver without any connections)
359 create_entity(false);
360 for (std::vector<std::string>::const_iterator i = tls.begin();
361 i != tls.end(); ++i) {
362 params.transports.clear();
363 params.transports.push_back(*i);
364 create_entity(true, params);
365 sender().connect(&receiver(), get_ep_params());
366 check_offload_support(true);
367 }
368 }
369
make_tag(entity & e,ucp_tag_t t)370 ucp_tag_t make_tag(entity &e, ucp_tag_t t)
371 {
372 uint64_t i;
373
374 for (i = 0; i < m_entities.size(); ++i) {
375 if (&m_entities.at(i) == &e) {
376 break;
377 }
378 }
379 return (i << 48) | t;
380 }
381
activate_offload_hashing(entity & se,ucp_tag_t tag)382 void activate_offload_hashing(entity &se, ucp_tag_t tag)
383 {
384 se.connect(&receiver(), get_ep_params());
385 // Need to send twice:
386 // 1. to ensure that wireup's UCT iface has been closed and
387 // it is not considered for num_active_iface on worker
388 // (message has to be less than `UCX_TM_THRESH` value)
389 // 2. to activate tag ofload
390 // (num_active_ifaces on worker is increased when any message
391 // is received on any iface. Tag hashing is done when we have
392 // more than 1 active ifaces and message has to be greater
393 // than `UCX_TM_THRESH` value)
394 send_recv(se, tag, 8);
395 send_recv(se, tag, 2048);
396 }
397
post_recv_and_check(entity & e,unsigned sw_count,ucp_tag_t tag,ucp_tag_t tag_mask)398 void post_recv_and_check(entity &e, unsigned sw_count, ucp_tag_t tag,
399 ucp_tag_t tag_mask)
400 {
401 std::vector<char> recvbuf(2048, 0);
402 request *req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE,
403 make_tag(e, tag), UCP_TAG_MASK_FULL);
404
405 EXPECT_EQ(sw_count, receiver().worker()->tm.expected.sw_all_count);
406 req_cancel(receiver(), req);
407 }
408
409
410 protected:
411 static const uint64_t TAG_SENDER = 0xFFFFFFFFFFFF0000;
412 };
413
414
UCS_TEST_P(test_ucp_tag_offload_multi,recv_from_multi)415 UCS_TEST_P(test_ucp_tag_offload_multi, recv_from_multi)
416 {
417 ucp_tag_t tag = 0x11;
418
419 // Activate first offload iface. Tag hashing is not done yet, since we
420 // have only one active iface so far.
421 activate_offload_hashing(e(0), make_tag(e(0), tag));
422 EXPECT_EQ(0u, kh_size(&receiver().worker()->tm.offload.tag_hash));
423
424 // Activate second offload iface. The tag has been added to the hash.
425 // From now requests will be offloaded only for those tags which are
426 // in the hash.
427 activate_offload_hashing(e(1), make_tag(e(1), tag));
428 EXPECT_EQ(1u, kh_size(&receiver().worker()->tm.offload.tag_hash));
429
430 // Need to send a message on the first iface again, for its 'tag_sender'
431 // part of the tag to be added to the hash.
432 send_recv(e(0), make_tag(e(0), tag), 2048);
433 EXPECT_EQ(2u, kh_size(&receiver().worker()->tm.offload.tag_hash));
434
435 // Now requests from first two senders should be always offloaded regardless
436 // of the tag value. Tag does not matter, because hashing is done with
437 // 'tag & tag_sender_mask' as a key.
438 for (int i = 0; i < 2; ++i) {
439 post_recv_and_check(e(i), 0u, tag + i, UCP_TAG_MASK_FULL);
440 }
441
442 // This request should not be offloaded, because it is sent by the new
443 // sender and its 'tag_sender_mask' is not added to the hash yet.
444 post_recv_and_check(e(2), 1u, tag, UCP_TAG_MASK_FULL);
445
446 activate_offload_hashing(e(2), make_tag(e(2), tag));
447 EXPECT_EQ(3u, kh_size(&receiver().worker()->tm.offload.tag_hash));
448
449 // Check that this sender was added as well
450 post_recv_and_check(e(2), 0u, tag + 1, UCP_TAG_MASK_FULL);
451 }
452
453 // Do not include SM transports, because they would be selected for tag matching.
454 // And since they do not support TM offload, this test would be skipped.
455 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_multi, all_rcdc, "rc,dc")
456
457
458 class test_ucp_tag_offload_selection : public test_ucp_tag_offload {
459 public:
test_ucp_tag_offload_selection()460 test_ucp_tag_offload_selection() {
461 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
462 }
463
get_dev_type(ucp_ep_h ep,ucp_rsc_index_t idx)464 static uct_device_type_t get_dev_type(ucp_ep_h ep, ucp_rsc_index_t idx) {
465 return ep->worker->context->tl_rscs[idx].tl_rsc.dev_type;
466 }
467
lane_shm_or_self(ucp_ep_h ep,ucp_rsc_index_t idx)468 static bool lane_shm_or_self(ucp_ep_h ep, ucp_rsc_index_t idx) {
469 uct_device_type_t dev_type = get_dev_type(ep, idx);
470 return (dev_type == UCT_DEVICE_TYPE_SHM) || (dev_type == UCT_DEVICE_TYPE_SELF);
471 }
472 };
473
UCS_TEST_P(test_ucp_tag_offload_selection,tag_lane)474 UCS_TEST_P(test_ucp_tag_offload_selection, tag_lane)
475 {
476 ucp_ep_h ep = sender().ep();
477 bool has_tag_offload = false;
478 bool has_shm_or_self = false;
479
480 for (ucp_rsc_index_t idx = 0; idx < sender().ucph()->num_tls; ++idx) {
481 if (lane_shm_or_self(ep, idx)) {
482 has_shm_or_self = true;
483 }
484
485 uct_iface_attr_t *attr = ucp_worker_iface_get_attr(sender().worker(), idx);
486 if (attr->cap.flags & UCT_IFACE_FLAG_TAG_EAGER_BCOPY) {
487 // We do not have transports with partial tag offload support
488 EXPECT_TRUE(attr->cap.flags & UCT_IFACE_FLAG_TAG_RNDV_ZCOPY);
489 has_tag_offload = true;
490 }
491 }
492
493 ucp_ep_config_t *ep_config = ucp_ep_config(ep);
494
495 if (has_tag_offload && !has_shm_or_self) {
496 EXPECT_TRUE(ucp_ep_is_tag_offload_enabled(ep_config));
497 EXPECT_EQ(ep_config->key.tag_lane, ep_config->tag.lane);
498 } else {
499 // If shm or self transports exist they would be used for tag matching
500 // rather than network offload
501 EXPECT_FALSE(ucp_ep_is_tag_offload_enabled(ep_config));
502 EXPECT_EQ(ep_config->key.am_lane, ep_config->tag.lane);
503 }
504 }
505
506 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload_selection);
507 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_selection, self_rcx,
508 "self,rc_x");
509
510
511 class test_ucp_tag_offload_gpu : public test_ucp_tag_offload {
512 public:
test_ucp_tag_offload_gpu()513 test_ucp_tag_offload_gpu() {
514 modify_config("RNDV_THRESH", "1024");
515 }
516
517 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)518 static enum_test_params(const ucp_params_t& ctx_params,
519 const std::string& name,
520 const std::string& test_case_name,
521 const std::string& tls)
522 {
523 std::vector<ucp_test_param> result;
524
525 generate_test_params_variant(ctx_params, name, test_case_name,
526 tls, UCS_MEMORY_TYPE_CUDA, result);
527 generate_test_params_variant(ctx_params, name, test_case_name,
528 tls, UCS_MEMORY_TYPE_ROCM, result);
529
530 return result;
531 }
532
533 protected:
mem_type() const534 ucs_memory_type_t mem_type() const {
535 return static_cast<ucs_memory_type_t>(GetParam().variant);
536 }
537 };
538
539 // Test that expected SW RNDV request is handled properly when receive buffer
540 // is allocated on GPU memory.
541 UCS_TEST_P(test_ucp_tag_offload_gpu, sw_rndv_to_gpu_mem, "TM_SW_RNDV=y")
542 {
543 activate_offload(sender());
544
545 size_t size = 2048;
546 ucp_tag_t tag = 0xCAFEBABEul;
547 // Test will be skipped here if GPU mem is not supported
548 mem_buffer rbuf(size, mem_type());
549 request *rreq = recv_nb_exp(rbuf.ptr(), size, DATATYPE, tag,
550 UCP_TAG_MASK_FULL);
551
552 std::vector<uint8_t> sendbuf(size); // can send from any memory
553 request *sreq = (request*)ucp_tag_send_nb(sender().ep(), &sendbuf[0],
554 size, DATATYPE, tag,
555 send_callback);
556 wait_and_validate(rreq);
557 wait_and_validate(sreq);
558 }
559
560 // Test that small buffers wich can be scattered to CQE are not posted to the
561 // HW. Otherwise it may segfault, while copying data from CQE to the
562 // (potentially) GPU buffer.
563 UCS_TEST_P(test_ucp_tag_offload_gpu, rx_scatter_to_cqe, "TM_THRESH=1")
564 {
565 activate_offload(sender());
566
567 size_t size = 8;
568 ucp_tag_t tag = 0xCAFEBABEul;
569 // Test will be skipped here if GPU mem is not supported
570 mem_buffer rbuf(size, mem_type());
571 request *rreq = recv_nb_exp(rbuf.ptr(), size, DATATYPE, tag,
572 UCP_TAG_MASK_FULL);
573 uint64_t sbuf = 0ul;
574 request *sreq = (request*)ucp_tag_send_nb(sender().ep(), &sbuf, sizeof(sbuf),
575 DATATYPE, tag, send_callback);
576 wait_and_validate(rreq);
577 wait_and_validate(sreq);
578 }
579
580 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_gpu, rc_dc_gpu,
581 "dc_x,rc_x," UCP_TEST_GPU_COPY_TLS)
582
583 class test_ucp_tag_offload_status : public test_ucp_tag {
584 public:
test_ucp_tag_offload_status()585 test_ucp_tag_offload_status() {
586 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
587 }
588
get_ctx_params()589 static ucp_params_t get_ctx_params() {
590 ucp_params_t params = ucp_test::get_ctx_params();
591 // Do not pass UCP_FEATURE_TAG feature to check that UCT will not
592 // initialize tag offload infrastructure in this case.
593 params.features = UCP_FEATURE_RMA;
594 return params;
595 }
596 };
597
UCS_TEST_P(test_ucp_tag_offload_status,check_offload_status)598 UCS_TEST_P(test_ucp_tag_offload_status, check_offload_status)
599 {
600 for (ucp_rsc_index_t i = 0; i < sender().ucph()->num_tls; ++i) {
601 EXPECT_FALSE(ucp_worker_iface_get_attr(sender().worker(), i)->cap.flags &
602 (UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
603 UCT_IFACE_FLAG_TAG_RNDV_ZCOPY));
604 }
605 }
606
607 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload_status)
608
609 #ifdef ENABLE_STATS
610
611 class test_ucp_tag_offload_stats : public test_ucp_tag_offload_multi {
612 public:
613
init()614 void init()
615 {
616 stats_activate();
617 test_ucp_tag_offload::init(); // No need for multi::init()
618 }
619
cleanup()620 void cleanup()
621 {
622 test_ucp_tag_offload::cleanup();
623 stats_restore();
624 }
625
recv_nb_exp(void * buffer,size_t count,ucp_datatype_t dt,ucp_tag_t tag,ucp_tag_t tag_mask)626 request* recv_nb_exp(void *buffer, size_t count, ucp_datatype_t dt,
627 ucp_tag_t tag, ucp_tag_t tag_mask)
628 {
629 request *req1 = recv_nb_and_check(buffer, count, DATATYPE, tag,
630 UCP_TAG_MASK_FULL);
631
632 // Post and cancel another receive to make sure the first one was offloaded
633 size_t size = receiver().worker()->context->config.ext.tm_thresh + 1;
634 std::vector<char> tbuf(size, 0);
635 request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag,
636 UCP_TAG_MASK_FULL);
637 req_cancel(receiver(), req2);
638
639 return req1;
640 }
641
worker_offload_stats(entity & e)642 ucs_stats_node_t* worker_offload_stats(entity &e)
643 {
644 return e.worker()->tm_offload_stats;
645 }
646
validate_offload_counter(uint64_t rx_cntr,uint64_t val)647 void validate_offload_counter(uint64_t rx_cntr, uint64_t val)
648 {
649 uint64_t cnt;
650 cnt = UCS_STATS_GET_COUNTER(worker_offload_stats(receiver()), rx_cntr);
651 EXPECT_EQ(val, cnt);
652 }
653
wait_counter(ucs_stats_node_t * stats,uint64_t cntr,double timeout=ucs::test_timeout_in_sec)654 void wait_counter(ucs_stats_node_t *stats, uint64_t cntr,
655 double timeout = ucs::test_timeout_in_sec)
656 {
657 ucs_time_t deadline = ucs::get_deadline(timeout);
658 uint64_t v;
659
660 do {
661 short_progress_loop();
662 v = UCS_STATS_GET_COUNTER(stats, cntr);
663 } while ((ucs_get_time() < deadline) && !v);
664
665 EXPECT_EQ(1ul, v);
666 }
667
test_send_recv(size_t count,bool send_iov,uint64_t cntr)668 void test_send_recv(size_t count, bool send_iov, uint64_t cntr)
669 {
670 ucp_tag_t tag = 0x11;
671
672 std::vector<char> sbuf(count, 0);
673 std::vector<char> rbuf(count, 0);
674 request *req = recv_nb_exp(rbuf.data(), rbuf.size(), DATATYPE, tag,
675 UCP_TAG_MASK_FULL);
676
677 if (send_iov) {
678 ucp::data_type_desc_t dt_desc(DATATYPE_IOV, sbuf.data(),
679 sbuf.size(), 1);
680 send_b(dt_desc.buf(), dt_desc.count(), dt_desc.dt(), tag);
681 } else {
682 send_b(sbuf.data(), sbuf.size(), DATATYPE, tag);
683 }
684 wait(req);
685 request_free(req);
686
687 validate_offload_counter(cntr, 1ul);
688 }
689 };
690
691 UCS_TEST_P(test_ucp_tag_offload_stats, post, "TM_THRESH=128")
692 {
693 uint64_t tag = 0x11;
694 std::vector<char> dummy(256, 0);
695
696 activate_offload(sender());
697
698 request *rreq = recv_nb(dummy.data(), dummy.size(), DATATYPE, tag,
699 UCP_TAG_MASK_FULL);
700
701 wait_counter(worker_offload_stats(receiver()),
702 UCP_WORKER_STAT_TAG_OFFLOAD_POSTED);
703
704 req_cancel(receiver(), rreq);
705
706 wait_counter(worker_offload_stats(receiver()),
707 UCP_WORKER_STAT_TAG_OFFLOAD_CANCELED);
708 }
709
710 UCS_TEST_P(test_ucp_tag_offload_stats, block, "TM_THRESH=128")
711 {
712 uint64_t tag = 0x11;
713 std::vector<char> buf(256, 0);
714
715 activate_offload(sender());
716
717 // Check BLOCK_NON_CONTIG
718 ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), buf.size(), 1);
719 request *rreq = recv_nb_and_check(dt_desc.buf(), dt_desc.count(),
720 dt_desc.dt(), tag, UCP_TAG_MASK_FULL);
721
722 wait_counter(worker_offload_stats(receiver()),
723 UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NON_CONTIG);
724
725 req_cancel(receiver(), rreq);
726
727 // Check BLOCK_WILDCARD
728 rreq = recv_nb_and_check(buf.data(), buf.size(), DATATYPE, tag, 0);
729
730 wait_counter(worker_offload_stats(receiver()),
731 UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_WILDCARD);
732
733 req_cancel(receiver(), rreq);
734
735 // Check BLOCK_TAG_EXCEED
736 std::vector<request*> reqs;
737 uint64_t cnt;
738 unsigned limit = 1000; // Just a big value to avoid test hang
739 do {
740 rreq = recv_nb_and_check(buf.data(), buf.size(), DATATYPE, tag,
741 UCP_TAG_MASK_FULL);
742 cnt = UCS_STATS_GET_COUNTER(worker_offload_stats(receiver()),
743 UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED);
744 reqs.push_back(rreq);
745 } while (!cnt && (--limit > 0));
746
747 validate_offload_counter(UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED , 1ul);
748
749 for (std::vector<request*>::const_iterator iter = reqs.begin();
750 iter != reqs.end(); ++iter) {
751 req_cancel(receiver(), *iter);
752 }
753 }
754
755 UCS_TEST_P(test_ucp_tag_offload_stats, eager, "RNDV_THRESH=1000", "TM_THRESH=64")
756 {
757 size_t size = 512; // Size smaller than RNDV, but bigger than TM thresh
758
759 // Offload is not activated, so the first message should arrive unexpectedly
760 test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_EGR);
761 test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED);
762 }
763
764 UCS_TEST_P(test_ucp_tag_offload_stats, rndv, "RNDV_THRESH=1000")
765 {
766 size_t size = 2048; // Size bigger than RNDV thresh
767
768 // Offload is not activated, so the first message should arrive unexpectedly
769 test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_RNDV);
770 test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED);
771 }
772
773 UCS_TEST_P(test_ucp_tag_offload_stats, sw_rndv, "RNDV_THRESH=1000")
774 {
775 size_t size = 2048; // Size bigger than RNDV thresh
776
777 // Offload is not activated, so the first message should arrive unexpectedly
778 test_send_recv(size, true, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV);
779 test_send_recv(size, true, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV);
780 }
781
782 UCS_TEST_P(test_ucp_tag_offload_stats, force_sw_rndv, "TM_SW_RNDV=y",
783 "RNDV_THRESH=1000")
784 {
785 size_t size = 2048; // Size bigger than RNDV thresh
786
787 // Offload is not activated, so the first message should arrive unexpectedly
788 test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV);
789 test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV);
790 }
791
792
793 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload_stats)
794
795
796 class test_ucp_tag_offload_stats_gpu : public test_ucp_tag_offload_stats {
797 public:
test_ucp_tag_offload_stats_gpu()798 test_ucp_tag_offload_stats_gpu() {
799 m_env.push_back(new ucs::scoped_setenv("UCX_IB_GPU_DIRECT_RDMA", "n"));
800 }
801
802 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)803 static enum_test_params(const ucp_params_t& ctx_params,
804 const std::string& name,
805 const std::string& test_case_name,
806 const std::string& tls)
807 {
808 std::vector<ucp_test_param> result;
809
810 generate_test_params_variant(ctx_params, name, test_case_name,
811 tls, UCS_MEMORY_TYPE_CUDA, result);
812 generate_test_params_variant(ctx_params, name, test_case_name,
813 tls, UCS_MEMORY_TYPE_ROCM, result);
814
815 return result;
816 }
817
818 protected:
mem_type() const819 ucs_memory_type_t mem_type() const {
820 return static_cast<ucs_memory_type_t>(GetParam().variant);
821 }
822 };
823
824 UCS_TEST_P(test_ucp_tag_offload_stats_gpu, block_gpu_no_gpu_direct,
825 "TM_THRESH=128")
826 {
827 activate_offload(sender());
828
829 size_t size = 2048;
830 // Test will be skipped here if GPU mem is not supported
831 mem_buffer rbuf(size, mem_type());
832 request *rreq = recv_nb_and_check(rbuf.ptr(), size, DATATYPE, 0x11,
833 UCP_TAG_MASK_FULL);
834
835 wait_counter(worker_offload_stats(receiver()),
836 UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_MEM_REG);
837
838 validate_offload_counter(UCP_WORKER_STAT_TAG_OFFLOAD_POSTED, 0ul);
839
840 req_cancel(receiver(), rreq);
841 }
842
843 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_stats_gpu, rc_dc_gpu,
844 "dc_x,rc_x," UCP_TEST_GPU_COPY_TLS)
845
846 #endif
847