1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2017-2019.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #include "test_ucp_tag.h"
8 
9 #include "ucp_datatype.h"
10 
11 extern "C" {
12 #include <ucp/core/ucp_ep.inl>
13 #include <ucp/core/ucp_worker.h>
14 #include <ucp/tag/tag_match.h>
15 }
16 
17 #define UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(_test_case) \
18     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, dcx, "dc_x") \
19     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, rcx, "rc_x")
20 
21 class test_ucp_tag_offload : public test_ucp_tag {
22 public:
test_ucp_tag_offload()23     test_ucp_tag_offload() {
24         // TODO: test offload and offload MP as different variants
25         enable_tag_mp_offload();
26     }
27 
init()28     void init()
29     {
30         test_ucp_tag::init();
31         check_offload_support(true);
32     }
33 
recv_nb_and_check(void * buffer,size_t count,ucp_datatype_t dt,ucp_tag_t tag,ucp_tag_t tag_mask)34     request* recv_nb_and_check(void *buffer, size_t count, ucp_datatype_t dt,
35                                ucp_tag_t tag, ucp_tag_t tag_mask)
36     {
37         request *req = recv_nb(buffer, count, dt, tag, tag_mask);
38         EXPECT_TRUE(!UCS_PTR_IS_ERR(req));
39         EXPECT_TRUE(req != NULL);
40         return req;
41     }
42 
recv_nb_exp(void * buffer,size_t count,ucp_datatype_t dt,ucp_tag_t tag,ucp_tag_t tag_mask)43     request* recv_nb_exp(void *buffer, size_t count, ucp_datatype_t dt,
44                          ucp_tag_t tag, ucp_tag_t tag_mask)
45     {
46         request *req1 = recv_nb_and_check(buffer, count, DATATYPE, tag,
47                                           UCP_TAG_MASK_FULL);
48 
49         // Post and cancel another receive to make sure the first one was offloaded
50         size_t size = receiver().worker()->context->config.ext.tm_thresh + 1;
51         std::vector<char> tbuf(size, 0);
52         request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag,
53                                           UCP_TAG_MASK_FULL);
54         req_cancel(receiver(), req2);
55 
56         return req1;
57     }
58 
send_recv(entity & se,ucp_tag_t tag,size_t length)59     void send_recv(entity &se, ucp_tag_t tag, size_t length)
60     {
61         std::vector<uint8_t> sendbuf(length);
62         std::vector<uint8_t> recvbuf(length);
63 
64         request *rreq = recv_nb_exp(&recvbuf[0], length, DATATYPE, tag,
65                                     UCP_TAG_MASK_FULL);
66 
67         request *sreq = (request*)ucp_tag_send_nb(se.ep(), &sendbuf[0], length,
68                                                   DATATYPE, tag, send_callback);
69         if (UCS_PTR_IS_ERR(sreq)) {
70             ASSERT_UCS_OK(UCS_PTR_STATUS(sreq));
71         } else if (sreq != NULL) {
72             wait(sreq);
73             request_free(sreq);
74         }
75 
76         wait(rreq);
77         request_free(rreq);
78     }
79 
activate_offload(entity & se,ucp_tag_t tag=0x11)80     void activate_offload(entity &se, ucp_tag_t tag = 0x11)
81     {
82         send_recv(se, tag, receiver().worker()->context->config.ext.tm_thresh);
83     }
84 
req_cancel(entity & e,request * req)85     void req_cancel(entity &e, request *req)
86     {
87         ucp_request_cancel(e.worker(), req);
88         wait(req);
89         request_free(req);
90     }
91 };
92 
UCS_TEST_P(test_ucp_tag_offload,post_after_cancel)93 UCS_TEST_P(test_ucp_tag_offload, post_after_cancel)
94 {
95     uint64_t small_val = 0xFAFA;
96     ucp_tag_t tag      = 0x11;
97     std::vector<char> recvbuf(2048, 0);
98 
99     activate_offload(sender());
100 
101     request *req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
102                                      tag, UCP_TAG_MASK_FULL);
103 
104     EXPECT_EQ(1u, receiver().worker()->tm.expected.sw_all_count);
105     req_cancel(receiver(), req);
106     EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
107 
108     req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag,
109                             UCP_TAG_MASK_FULL);
110 
111     EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
112     req_cancel(receiver(), req);
113 }
114 
UCS_TEST_P(test_ucp_tag_offload,post_after_comp)115 UCS_TEST_P(test_ucp_tag_offload, post_after_comp)
116 {
117     uint64_t small_val = 0xFAFA;
118     ucp_tag_t tag      = 0x11;
119     std::vector<char> recvbuf(2048, 0);
120 
121     activate_offload(sender());
122 
123     request *req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
124                                      tag, UCP_TAG_MASK_FULL);
125 
126     EXPECT_EQ(1u, receiver().worker()->tm.expected.sw_all_count);
127 
128     send_b(&small_val, sizeof(small_val), DATATYPE, tag);
129     wait(req);
130     request_free(req);
131     EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
132 
133     req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag,
134                             UCP_TAG_MASK_FULL);
135 
136     EXPECT_EQ(0u, receiver().worker()->tm.expected.sw_all_count);
137     req_cancel(receiver(), req);
138 }
139 
UCS_TEST_P(test_ucp_tag_offload,post_wild)140 UCS_TEST_P(test_ucp_tag_offload, post_wild)
141 {
142     uint64_t small_val = 0xFAFA;
143     ucp_tag_t tag1     = 0x11; // these two tags should go to different
144     ucp_tag_t tag2     = 0x13; // hash buckets in the TM expected queue
145     std::vector<char> recvbuf(2048, 0);
146 
147     activate_offload(sender());
148 
149     request *req1 = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
150                                       tag1, 0);
151     EXPECT_EQ(1u, receiver().worker()->tm.expected.sw_all_count);
152 
153     request *req2 = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag2,
154                                       UCP_TAG_MASK_FULL);
155     // Second request should not be posted as well. Even though it has another
156     // tag, the first request is a wildcard, which needs to be handled in SW,
157     // so it blocks all other requests
158     EXPECT_EQ(2u, receiver().worker()->tm.expected.sw_all_count);
159     req_cancel(receiver(), req1);
160     req_cancel(receiver(), req2);
161 }
162 
UCS_TEST_P(test_ucp_tag_offload,post_dif_buckets)163 UCS_TEST_P(test_ucp_tag_offload, post_dif_buckets)
164 {
165     uint64_t small_val = 0xFAFA;
166     ucp_tag_t tag1     = 0x11; // these two tags should go to different
167     ucp_tag_t tag2     = 0x13; // hash buckets in the TM expected queue
168     std::vector<request*> reqs;
169     request *req;
170 
171     std::vector<char> recvbuf(2048, 0);
172 
173     activate_offload(sender());
174 
175     req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE, tag1,
176                             UCP_TAG_MASK_FULL);
177     reqs.push_back(req);
178 
179     req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag1,
180                             UCP_TAG_MASK_FULL);
181     reqs.push_back(req);
182 
183     // The first request was not offloaded due to small size and the second
184     // is blocked by the first one.
185     EXPECT_EQ(2u, receiver().worker()->tm.expected.sw_all_count);
186 
187     req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE, tag2,
188                             UCP_TAG_MASK_FULL);
189     reqs.push_back(req);
190 
191     // Check that another request with different tag is offloaded.
192     EXPECT_EQ(2u, receiver().worker()->tm.expected.sw_all_count);
193 
194     for (std::vector<request*>::const_iterator iter = reqs.begin();
195          iter != reqs.end(); ++iter) {
196         req_cancel(receiver(), *iter);
197     }
198 }
199 
200 UCS_TEST_P(test_ucp_tag_offload, force_thresh_basic, "TM_FORCE_THRESH=4k",
201                                                      "TM_THRESH=1k")
202 {
203     uint64_t small_val      = 0xFAFA;
204     const size_t big_size   = 5000;
205     int num_reqs            = 8;
206     int tag                 = 0x11;
207     std::vector<request*> reqs;
208     request *req;
209 
210     activate_offload(sender());
211 
212     for (int i = 0; i < num_reqs - 1; ++i) {
213         req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
214                                 tag, UCP_TAG_MASK_FULL);
215         reqs.push_back(req);
216     }
217 
218     // No requests should be posted to the transport, because their sizes less
219     // than TM_THRESH
220     EXPECT_EQ((unsigned)(num_reqs - 1), receiver().worker()->tm.expected.sw_all_count);
221 
222     std::vector<char> recvbuf_big(big_size, 0);
223 
224     req = recv_nb(&recvbuf_big[0], recvbuf_big.size(), DATATYPE, tag,
225                   UCP_TAG_MASK_FULL);
226     reqs.push_back(req);
227 
228     // Now, all requests should be posted to the transport, because receive
229     // buffer bigger than FORCE_THRESH has been posted
230     EXPECT_EQ((unsigned)0, receiver().worker()->tm.expected.sw_all_count);
231 
232     std::vector<request*>::const_iterator iter;
233     for (iter = reqs.begin(); iter != reqs.end(); ++iter) {
234         req_cancel(receiver(), *iter);
235     }
236 }
237 
238 UCS_TEST_P(test_ucp_tag_offload, force_thresh_blocked, "TM_FORCE_THRESH=4k",
239                                                        "TM_THRESH=1k")
240 {
241     uint64_t small_val      = 0xFAFA;
242     const size_t big_size   = 5000;
243     int num_reqs            = 8;
244     int tag                 = 0x11;
245     std::vector<request*> reqs;
246     request *req;
247     int i;
248 
249     activate_offload(sender());
250 
251     for (i = 0; i < num_reqs - 3; ++i) {
252         req = recv_nb_and_check(&small_val, sizeof(small_val), DATATYPE,
253                                 tag, UCP_TAG_MASK_FULL);
254         reqs.push_back(req);
255     }
256 
257     // Add request with noncontig dt
258     std::vector<char> buf(64, 0);
259     ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), buf.size(), 1);
260     req = recv_nb_and_check(dt_desc.buf(), dt_desc.count(), dt_desc.dt(),
261                             tag, UCP_TAG_MASK_FULL);
262     reqs.push_back(req);
263 
264     // Add request with wildcard tag
265     req = recv_nb(&small_val, sizeof(small_val), DATATYPE, tag, 0);
266     reqs.push_back(req);
267 
268     std::vector<char> recvbuf_big(big_size, 0);
269     // Check that offload is not forced while there are uncompleted blocking
270     // SW requests with the same tag
271     for (i = 0; i < 2; ++i) {
272         req = recv_nb_and_check(&recvbuf_big[0], recvbuf_big.size(), DATATYPE, tag,
273                                 UCP_TAG_MASK_FULL);
274         EXPECT_EQ((unsigned)(num_reqs - i), receiver().worker()->tm.expected.sw_all_count);
275         req_cancel(receiver(), req);
276 
277         req_cancel(receiver(), reqs.back());
278         reqs.pop_back();
279     }
280 
281     req = recv_nb(&recvbuf_big[0], recvbuf_big.size(), DATATYPE, tag,
282                   UCP_TAG_MASK_FULL);
283     reqs.push_back(req);
284 
285     // Now, all requests should be posted to the transport, because receive
286     // buffer bigger than FORCE_THRESH has been posted
287     EXPECT_EQ((unsigned)0, receiver().worker()->tm.expected.sw_all_count);
288 
289     std::vector<request*>::const_iterator iter;
290     for (iter = reqs.begin(); iter != reqs.end(); ++iter) {
291         req_cancel(receiver(), *iter);
292     }
293 }
294 
295 // Check that worker will not try to connect tag offload capable iface with
296 // the peer which does not support tag offload (e.g CX-5 and CX-4). In this
297 // case connection attempt should fail (due to peer unreachable) or some other
298 // transport should be selected (if available). Otherwise connect can hang,
299 // because some transports (e.g. rcx) have different ep address type for
300 // interfaces which support tag_offload.
UCS_TEST_P(test_ucp_tag_offload,connect)301 UCS_TEST_P(test_ucp_tag_offload, connect)
302 {
303     m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "n"));
304 
305     entity *e = create_entity(true);
306     // Should be:
307     // - either complete ok
308     // - or force skipping the test (because peer is unreachable)
309     e->connect(&receiver(), get_ep_params());
310 }
311 
312 UCS_TEST_P(test_ucp_tag_offload, small_rndv, "RNDV_THRESH=0", "TM_THRESH=0")
313 {
314     activate_offload(sender());
315     send_recv(sender(), 0x11ul, 0ul);
316     send_recv(sender(), 0x11ul, 1ul);
317 }
318 
319 UCS_TEST_P(test_ucp_tag_offload, small_sw_rndv, "RNDV_THRESH=0", "TM_THRESH=0",
320                                                 "TM_SW_RNDV=y")
321 {
322     activate_offload(sender());
323     send_recv(sender(), 0x11ul, 0ul);
324     send_recv(sender(), 0x11ul, 1ul);
325 }
326 
327 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload)
328 
329 
330 class test_ucp_tag_offload_multi : public test_ucp_tag_offload {
331 public:
332 
get_ctx_params()333     static ucp_params_t get_ctx_params()
334     {
335         ucp_params_t params    = test_ucp_tag::get_ctx_params();
336         params.field_mask     |= UCP_PARAM_FIELD_TAG_SENDER_MASK;
337         params.tag_sender_mask = TAG_SENDER;
338         return params;
339     }
340 
init()341     void init()
342     {
343         // The test checks that increase of active ifaces is handled
344         // correctly. It needs to start with a single active iface, therefore
345         // disable multi-rail.
346         modify_config("MAX_EAGER_LANES", "1");
347         modify_config("MAX_RNDV_LANES",  "1");
348 
349         test_ucp_tag_offload::init();
350 
351         // TODO: add more tls which support tag offloading
352         std::vector<std::string> tls;
353         tls.push_back("dc_x");
354         tls.push_back("rc_x");
355         ucp_test_param params = GetParam();
356 
357         // Create new entity and add to to the end of vector
358         // (thus it will be receiver without any connections)
359         create_entity(false);
360         for (std::vector<std::string>::const_iterator i = tls.begin();
361              i != tls.end(); ++i) {
362             params.transports.clear();
363             params.transports.push_back(*i);
364             create_entity(true, params);
365             sender().connect(&receiver(), get_ep_params());
366             check_offload_support(true);
367         }
368     }
369 
make_tag(entity & e,ucp_tag_t t)370     ucp_tag_t make_tag(entity &e, ucp_tag_t t)
371     {
372         uint64_t i;
373 
374         for (i = 0; i < m_entities.size(); ++i) {
375              if (&m_entities.at(i) == &e) {
376                  break;
377              }
378         }
379         return (i << 48) | t;
380     }
381 
activate_offload_hashing(entity & se,ucp_tag_t tag)382     void activate_offload_hashing(entity &se, ucp_tag_t tag)
383     {
384         se.connect(&receiver(), get_ep_params());
385         // Need to send twice:
386         // 1. to ensure that wireup's UCT iface has been closed and
387         //    it is not considered for num_active_iface on worker
388         //    (message has to be less than `UCX_TM_THRESH` value)
389         // 2. to activate tag ofload
390         //    (num_active_ifaces on worker is increased when any message
391         //     is received on any iface. Tag hashing is done when we have
392         //     more than 1 active ifaces and message has to be greater
393         //     than `UCX_TM_THRESH` value)
394         send_recv(se, tag, 8);
395         send_recv(se, tag, 2048);
396     }
397 
post_recv_and_check(entity & e,unsigned sw_count,ucp_tag_t tag,ucp_tag_t tag_mask)398     void post_recv_and_check(entity &e, unsigned sw_count, ucp_tag_t tag,
399                              ucp_tag_t tag_mask)
400     {
401         std::vector<char> recvbuf(2048, 0);
402         request *req = recv_nb_and_check(&recvbuf, recvbuf.size(), DATATYPE,
403                                          make_tag(e, tag), UCP_TAG_MASK_FULL);
404 
405         EXPECT_EQ(sw_count, receiver().worker()->tm.expected.sw_all_count);
406         req_cancel(receiver(), req);
407     }
408 
409 
410 protected:
411     static const uint64_t TAG_SENDER = 0xFFFFFFFFFFFF0000;
412 };
413 
414 
UCS_TEST_P(test_ucp_tag_offload_multi,recv_from_multi)415 UCS_TEST_P(test_ucp_tag_offload_multi, recv_from_multi)
416 {
417     ucp_tag_t tag = 0x11;
418 
419     // Activate first offload iface. Tag hashing is not done yet, since we
420     // have only one active iface so far.
421     activate_offload_hashing(e(0), make_tag(e(0), tag));
422     EXPECT_EQ(0u, kh_size(&receiver().worker()->tm.offload.tag_hash));
423 
424     // Activate second offload iface. The tag has been added to the hash.
425     // From now requests will be offloaded only for those tags which are
426     // in the hash.
427     activate_offload_hashing(e(1), make_tag(e(1), tag));
428     EXPECT_EQ(1u, kh_size(&receiver().worker()->tm.offload.tag_hash));
429 
430     // Need to send a message on the first iface again, for its 'tag_sender'
431     // part of the tag to be added to the hash.
432     send_recv(e(0), make_tag(e(0), tag), 2048);
433     EXPECT_EQ(2u, kh_size(&receiver().worker()->tm.offload.tag_hash));
434 
435     // Now requests from first two senders should be always offloaded regardless
436     // of the tag value. Tag does not matter, because hashing is done with
437     // 'tag & tag_sender_mask' as a key.
438     for (int i = 0; i < 2; ++i) {
439         post_recv_and_check(e(i), 0u, tag + i, UCP_TAG_MASK_FULL);
440     }
441 
442     // This request should not be offloaded, because it is sent by the new
443     // sender and its 'tag_sender_mask' is not added to the hash yet.
444     post_recv_and_check(e(2), 1u, tag, UCP_TAG_MASK_FULL);
445 
446     activate_offload_hashing(e(2), make_tag(e(2), tag));
447     EXPECT_EQ(3u, kh_size(&receiver().worker()->tm.offload.tag_hash));
448 
449     // Check that this sender was added as well
450     post_recv_and_check(e(2), 0u, tag + 1, UCP_TAG_MASK_FULL);
451 }
452 
453 // Do not include SM transports, because they would be selected for tag matching.
454 // And since they do not support TM offload, this test would be skipped.
455 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_multi, all_rcdc, "rc,dc")
456 
457 
458 class test_ucp_tag_offload_selection : public test_ucp_tag_offload {
459 public:
test_ucp_tag_offload_selection()460     test_ucp_tag_offload_selection() {
461         m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
462     }
463 
get_dev_type(ucp_ep_h ep,ucp_rsc_index_t idx)464     static uct_device_type_t get_dev_type(ucp_ep_h ep, ucp_rsc_index_t idx) {
465         return ep->worker->context->tl_rscs[idx].tl_rsc.dev_type;
466     }
467 
lane_shm_or_self(ucp_ep_h ep,ucp_rsc_index_t idx)468     static bool lane_shm_or_self(ucp_ep_h ep, ucp_rsc_index_t idx) {
469         uct_device_type_t dev_type = get_dev_type(ep, idx);
470         return (dev_type == UCT_DEVICE_TYPE_SHM) || (dev_type == UCT_DEVICE_TYPE_SELF);
471     }
472 };
473 
UCS_TEST_P(test_ucp_tag_offload_selection,tag_lane)474 UCS_TEST_P(test_ucp_tag_offload_selection, tag_lane)
475 {
476     ucp_ep_h ep          = sender().ep();
477     bool has_tag_offload = false;
478     bool has_shm_or_self = false;
479 
480     for (ucp_rsc_index_t idx = 0; idx < sender().ucph()->num_tls; ++idx) {
481         if (lane_shm_or_self(ep, idx)) {
482             has_shm_or_self = true;
483         }
484 
485         uct_iface_attr_t *attr = ucp_worker_iface_get_attr(sender().worker(), idx);
486         if (attr->cap.flags & UCT_IFACE_FLAG_TAG_EAGER_BCOPY) {
487             // We do not have transports with partial tag offload support
488             EXPECT_TRUE(attr->cap.flags & UCT_IFACE_FLAG_TAG_RNDV_ZCOPY);
489             has_tag_offload = true;
490         }
491     }
492 
493     ucp_ep_config_t *ep_config = ucp_ep_config(ep);
494 
495     if (has_tag_offload && !has_shm_or_self) {
496         EXPECT_TRUE(ucp_ep_is_tag_offload_enabled(ep_config));
497         EXPECT_EQ(ep_config->key.tag_lane, ep_config->tag.lane);
498     } else {
499         // If shm or self transports exist they would be used for tag matching
500         // rather than network offload
501         EXPECT_FALSE(ucp_ep_is_tag_offload_enabled(ep_config));
502         EXPECT_EQ(ep_config->key.am_lane, ep_config->tag.lane);
503     }
504 }
505 
506 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload_selection);
507 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_selection, self_rcx,
508                               "self,rc_x");
509 
510 
511 class test_ucp_tag_offload_gpu : public test_ucp_tag_offload {
512 public:
test_ucp_tag_offload_gpu()513     test_ucp_tag_offload_gpu() {
514         modify_config("RNDV_THRESH", "1024");
515     }
516 
517     std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)518     static enum_test_params(const ucp_params_t& ctx_params,
519                             const std::string& name,
520                             const std::string& test_case_name,
521                             const std::string& tls)
522     {
523         std::vector<ucp_test_param> result;
524 
525         generate_test_params_variant(ctx_params, name, test_case_name,
526                                      tls, UCS_MEMORY_TYPE_CUDA, result);
527         generate_test_params_variant(ctx_params, name, test_case_name,
528                                      tls, UCS_MEMORY_TYPE_ROCM, result);
529 
530         return result;
531     }
532 
533 protected:
mem_type() const534     ucs_memory_type_t mem_type() const {
535         return static_cast<ucs_memory_type_t>(GetParam().variant);
536     }
537 };
538 
539 // Test that expected SW RNDV request is handled properly when receive buffer
540 // is allocated on GPU memory.
541 UCS_TEST_P(test_ucp_tag_offload_gpu, sw_rndv_to_gpu_mem, "TM_SW_RNDV=y")
542 {
543     activate_offload(sender());
544 
545     size_t size   = 2048;
546     ucp_tag_t tag = 0xCAFEBABEul;
547     // Test will be skipped here if GPU mem is not supported
548     mem_buffer rbuf(size, mem_type());
549     request *rreq = recv_nb_exp(rbuf.ptr(), size, DATATYPE, tag,
550                                 UCP_TAG_MASK_FULL);
551 
552     std::vector<uint8_t> sendbuf(size); // can send from any memory
553     request *sreq = (request*)ucp_tag_send_nb(sender().ep(), &sendbuf[0],
554                                               size, DATATYPE, tag,
555                                               send_callback);
556     wait_and_validate(rreq);
557     wait_and_validate(sreq);
558 }
559 
560 // Test that small buffers wich can be scattered to CQE are not posted to the
561 // HW. Otherwise it may segfault, while copying data from CQE to the
562 // (potentially) GPU buffer.
563 UCS_TEST_P(test_ucp_tag_offload_gpu, rx_scatter_to_cqe, "TM_THRESH=1")
564 {
565     activate_offload(sender());
566 
567     size_t size   = 8;
568     ucp_tag_t tag = 0xCAFEBABEul;
569     // Test will be skipped here if GPU mem is not supported
570     mem_buffer rbuf(size, mem_type());
571     request *rreq = recv_nb_exp(rbuf.ptr(), size, DATATYPE, tag,
572                                 UCP_TAG_MASK_FULL);
573     uint64_t sbuf = 0ul;
574     request *sreq = (request*)ucp_tag_send_nb(sender().ep(), &sbuf, sizeof(sbuf),
575                                               DATATYPE, tag, send_callback);
576     wait_and_validate(rreq);
577     wait_and_validate(sreq);
578 }
579 
580 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_gpu, rc_dc_gpu,
581                               "dc_x,rc_x," UCP_TEST_GPU_COPY_TLS)
582 
583 class test_ucp_tag_offload_status : public test_ucp_tag {
584 public:
test_ucp_tag_offload_status()585     test_ucp_tag_offload_status() {
586         m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_ENABLE", "y"));
587     }
588 
get_ctx_params()589     static ucp_params_t get_ctx_params() {
590         ucp_params_t params = ucp_test::get_ctx_params();
591         // Do not pass UCP_FEATURE_TAG feature to check that UCT will not
592         // initialize tag offload infrastructure in this case.
593         params.features     = UCP_FEATURE_RMA;
594         return params;
595     }
596 };
597 
UCS_TEST_P(test_ucp_tag_offload_status,check_offload_status)598 UCS_TEST_P(test_ucp_tag_offload_status, check_offload_status)
599 {
600     for (ucp_rsc_index_t i = 0; i < sender().ucph()->num_tls; ++i) {
601         EXPECT_FALSE(ucp_worker_iface_get_attr(sender().worker(), i)->cap.flags &
602                      (UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
603                       UCT_IFACE_FLAG_TAG_RNDV_ZCOPY));
604     }
605 }
606 
607 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload_status)
608 
609 #ifdef ENABLE_STATS
610 
611 class test_ucp_tag_offload_stats : public test_ucp_tag_offload_multi {
612 public:
613 
init()614     void init()
615     {
616         stats_activate();
617         test_ucp_tag_offload::init(); // No need for multi::init()
618     }
619 
cleanup()620     void cleanup()
621     {
622         test_ucp_tag_offload::cleanup();
623         stats_restore();
624     }
625 
recv_nb_exp(void * buffer,size_t count,ucp_datatype_t dt,ucp_tag_t tag,ucp_tag_t tag_mask)626     request* recv_nb_exp(void *buffer, size_t count, ucp_datatype_t dt,
627                          ucp_tag_t tag, ucp_tag_t tag_mask)
628     {
629         request *req1 = recv_nb_and_check(buffer, count, DATATYPE, tag,
630                                           UCP_TAG_MASK_FULL);
631 
632         // Post and cancel another receive to make sure the first one was offloaded
633         size_t size = receiver().worker()->context->config.ext.tm_thresh + 1;
634         std::vector<char> tbuf(size, 0);
635         request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag,
636                                           UCP_TAG_MASK_FULL);
637         req_cancel(receiver(), req2);
638 
639         return req1;
640     }
641 
worker_offload_stats(entity & e)642     ucs_stats_node_t* worker_offload_stats(entity &e)
643     {
644         return e.worker()->tm_offload_stats;
645     }
646 
validate_offload_counter(uint64_t rx_cntr,uint64_t val)647     void validate_offload_counter(uint64_t rx_cntr, uint64_t val)
648     {
649         uint64_t cnt;
650         cnt = UCS_STATS_GET_COUNTER(worker_offload_stats(receiver()), rx_cntr);
651         EXPECT_EQ(val, cnt);
652     }
653 
wait_counter(ucs_stats_node_t * stats,uint64_t cntr,double timeout=ucs::test_timeout_in_sec)654     void wait_counter(ucs_stats_node_t *stats, uint64_t cntr,
655                       double timeout = ucs::test_timeout_in_sec)
656     {
657         ucs_time_t deadline = ucs::get_deadline(timeout);
658         uint64_t   v;
659 
660         do {
661             short_progress_loop();
662             v = UCS_STATS_GET_COUNTER(stats, cntr);
663         } while ((ucs_get_time() < deadline) && !v);
664 
665         EXPECT_EQ(1ul, v);
666     }
667 
test_send_recv(size_t count,bool send_iov,uint64_t cntr)668     void test_send_recv(size_t count, bool send_iov, uint64_t cntr)
669     {
670         ucp_tag_t tag = 0x11;
671 
672         std::vector<char> sbuf(count, 0);
673         std::vector<char> rbuf(count, 0);
674         request *req = recv_nb_exp(rbuf.data(), rbuf.size(), DATATYPE, tag,
675                                    UCP_TAG_MASK_FULL);
676 
677         if (send_iov) {
678             ucp::data_type_desc_t dt_desc(DATATYPE_IOV, sbuf.data(),
679                                           sbuf.size(), 1);
680             send_b(dt_desc.buf(), dt_desc.count(), dt_desc.dt(), tag);
681         } else {
682             send_b(sbuf.data(), sbuf.size(), DATATYPE, tag);
683         }
684         wait(req);
685         request_free(req);
686 
687         validate_offload_counter(cntr, 1ul);
688     }
689 };
690 
691 UCS_TEST_P(test_ucp_tag_offload_stats, post, "TM_THRESH=128")
692 {
693     uint64_t tag = 0x11;
694     std::vector<char> dummy(256, 0);
695 
696     activate_offload(sender());
697 
698     request *rreq = recv_nb(dummy.data(), dummy.size(), DATATYPE, tag,
699                             UCP_TAG_MASK_FULL);
700 
701     wait_counter(worker_offload_stats(receiver()),
702                  UCP_WORKER_STAT_TAG_OFFLOAD_POSTED);
703 
704     req_cancel(receiver(), rreq);
705 
706     wait_counter(worker_offload_stats(receiver()),
707                  UCP_WORKER_STAT_TAG_OFFLOAD_CANCELED);
708 }
709 
710 UCS_TEST_P(test_ucp_tag_offload_stats, block, "TM_THRESH=128")
711 {
712     uint64_t tag = 0x11;
713     std::vector<char> buf(256, 0);
714 
715     activate_offload(sender());
716 
717     // Check BLOCK_NON_CONTIG
718     ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), buf.size(), 1);
719     request *rreq = recv_nb_and_check(dt_desc.buf(), dt_desc.count(),
720                                       dt_desc.dt(), tag, UCP_TAG_MASK_FULL);
721 
722     wait_counter(worker_offload_stats(receiver()),
723                  UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NON_CONTIG);
724 
725     req_cancel(receiver(), rreq);
726 
727     // Check BLOCK_WILDCARD
728     rreq = recv_nb_and_check(buf.data(), buf.size(), DATATYPE, tag, 0);
729 
730     wait_counter(worker_offload_stats(receiver()),
731                  UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_WILDCARD);
732 
733     req_cancel(receiver(), rreq);
734 
735     // Check BLOCK_TAG_EXCEED
736     std::vector<request*> reqs;
737     uint64_t cnt;
738     unsigned limit = 1000; // Just a big value to avoid test hang
739     do {
740         rreq = recv_nb_and_check(buf.data(), buf.size(), DATATYPE, tag,
741                                  UCP_TAG_MASK_FULL);
742         cnt  = UCS_STATS_GET_COUNTER(worker_offload_stats(receiver()),
743                                     UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED);
744         reqs.push_back(rreq);
745     } while (!cnt && (--limit > 0));
746 
747     validate_offload_counter(UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED , 1ul);
748 
749     for (std::vector<request*>::const_iterator iter = reqs.begin();
750          iter != reqs.end(); ++iter) {
751         req_cancel(receiver(), *iter);
752     }
753 }
754 
755 UCS_TEST_P(test_ucp_tag_offload_stats, eager, "RNDV_THRESH=1000", "TM_THRESH=64")
756 {
757     size_t size = 512; // Size smaller than RNDV, but bigger than TM thresh
758 
759     // Offload is not activated, so the first message should arrive unexpectedly
760     test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_EGR);
761     test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED);
762 }
763 
764 UCS_TEST_P(test_ucp_tag_offload_stats, rndv, "RNDV_THRESH=1000")
765 {
766     size_t size = 2048; // Size bigger than RNDV thresh
767 
768     // Offload is not activated, so the first message should arrive unexpectedly
769     test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_RNDV);
770     test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED);
771 }
772 
773 UCS_TEST_P(test_ucp_tag_offload_stats, sw_rndv, "RNDV_THRESH=1000")
774 {
775     size_t size = 2048; // Size bigger than RNDV thresh
776 
777     // Offload is not activated, so the first message should arrive unexpectedly
778     test_send_recv(size, true, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV);
779     test_send_recv(size, true, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV);
780 }
781 
782 UCS_TEST_P(test_ucp_tag_offload_stats, force_sw_rndv, "TM_SW_RNDV=y",
783                                                       "RNDV_THRESH=1000")
784 {
785     size_t size = 2048; // Size bigger than RNDV thresh
786 
787     // Offload is not activated, so the first message should arrive unexpectedly
788     test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV);
789     test_send_recv(size, false, UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV);
790 }
791 
792 
793 UCP_INSTANTIATE_TAG_OFFLOAD_TEST_CASE(test_ucp_tag_offload_stats)
794 
795 
796 class test_ucp_tag_offload_stats_gpu : public test_ucp_tag_offload_stats {
797 public:
test_ucp_tag_offload_stats_gpu()798     test_ucp_tag_offload_stats_gpu() {
799         m_env.push_back(new ucs::scoped_setenv("UCX_IB_GPU_DIRECT_RDMA", "n"));
800     }
801 
802     std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)803     static enum_test_params(const ucp_params_t& ctx_params,
804                             const std::string& name,
805                             const std::string& test_case_name,
806                             const std::string& tls)
807     {
808         std::vector<ucp_test_param> result;
809 
810         generate_test_params_variant(ctx_params, name, test_case_name,
811                                      tls, UCS_MEMORY_TYPE_CUDA, result);
812         generate_test_params_variant(ctx_params, name, test_case_name,
813                                      tls, UCS_MEMORY_TYPE_ROCM, result);
814 
815         return result;
816     }
817 
818 protected:
mem_type() const819     ucs_memory_type_t mem_type() const {
820         return static_cast<ucs_memory_type_t>(GetParam().variant);
821     }
822 };
823 
824 UCS_TEST_P(test_ucp_tag_offload_stats_gpu, block_gpu_no_gpu_direct,
825            "TM_THRESH=128")
826 {
827     activate_offload(sender());
828 
829     size_t size   = 2048;
830     // Test will be skipped here if GPU mem is not supported
831     mem_buffer rbuf(size, mem_type());
832     request *rreq = recv_nb_and_check(rbuf.ptr(), size, DATATYPE, 0x11,
833                                       UCP_TAG_MASK_FULL);
834 
835     wait_counter(worker_offload_stats(receiver()),
836                  UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_MEM_REG);
837 
838     validate_offload_counter(UCP_WORKER_STAT_TAG_OFFLOAD_POSTED, 0ul);
839 
840     req_cancel(receiver(), rreq);
841 }
842 
843 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_tag_offload_stats_gpu, rc_dc_gpu,
844                               "dc_x,rc_x," UCP_TEST_GPU_COPY_TLS)
845 
846 #endif
847