1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #include "ucp_test.h"
8 #include "common/test.h"
9 #include "ucp/ucp_test.h"
10 
11 #include <algorithm>
12 #include <set>
13 
14 extern "C" {
15 #include <ucp/wireup/address.h>
16 #include <ucp/core/ucp_ep.inl>
17 #include <ucs/sys/math.h>
18 }
19 
20 class test_ucp_wireup : public ucp_test {
21 public:
22     static std::vector<ucp_test_param>
23     enum_test_params_features(const ucp_params_t& ctx_params,
24                               const std::string& name,
25                               const std::string& test_case_name,
26                               const std::string& tls,
27                               uint64_t features, bool test_all = 0);
28 
29 protected:
30     enum {
31         TEST_RMA     = UCS_BIT(0),
32         TEST_TAG     = UCS_BIT(1),
33         TEST_STREAM  = UCS_BIT(2),
34         UNIFIED_MODE = UCS_BIT(3),
35         TEST_AMO     = UCS_BIT(4)
36     };
37 
38     typedef uint64_t               elem_type;
39     typedef std::vector<elem_type> vec_type;
40 
41     static const size_t BUFFER_LENGTH    = 16384;
42     static const ucp_datatype_t DT_U64 = ucp_dt_make_contig(sizeof(elem_type));
43     static const uint64_t TAG          = 0xdeadbeef;
44     static const elem_type SEND_DATA   = 0xdeadbeef12121212ull;
45 
46     virtual void init();
47     virtual void cleanup();
48 
49     void send_nb(ucp_ep_h ep, size_t length, int repeat, std::vector<void*>& reqs,
50                  uint64_t send_data = SEND_DATA);
51 
52     void send_b(ucp_ep_h ep, size_t length, int repeat,
53                 uint64_t send_data = SEND_DATA);
54 
55     void recv_b(ucp_worker_h worker, ucp_ep_h ep, size_t length, int repeat,
56                 uint64_t recv_data = SEND_DATA);
57 
58     void send_recv(ucp_ep_h send_ep, ucp_worker_h recv_worker, ucp_ep_h recv_ep,
59                    size_t vecsize, int repeat);
60 
61     void waitall(std::vector<void*> reqs);
62 
63     void disconnect(ucp_ep_h ep);
64 
65     void disconnect(ucp_test::entity &e);
66 
67     static void send_completion(void *request, ucs_status_t status);
68 
69     static void tag_recv_completion(void *request, ucs_status_t status,
70                                     ucp_tag_recv_info_t *info);
71 
72     void rkeys_cleanup();
73 
74     void memhs_cleanup();
75 
76     void clear_recv_data();
77 
78     void fill_send_data();
79 
80     ucp_rkey_h get_rkey(ucp_ep_h ep, ucp_mem_h memh);
81 
82 protected:
83     vec_type                               m_send_data;
84     vec_type                               m_recv_data;
85     ucs::handle<ucp_mem_h, ucp_context_h>  m_memh_sender;
86     ucs::handle<ucp_mem_h, ucp_context_h>  m_memh_receiver;
87     std::vector< ucs::handle<ucp_rkey_h> > m_rkeys;
88 
89 private:
90     static void stream_recv_completion(void *request, ucs_status_t status,
91                                        size_t length);
92 
93     static void unmap_memh(ucp_mem_h memh, ucp_context_h context);
94 };
95 
96 std::vector<ucp_test_param>
enum_test_params_features(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls,uint64_t features,bool test_all)97 test_ucp_wireup::enum_test_params_features(const ucp_params_t& ctx_params,
98                                            const std::string& name,
99                                            const std::string& test_case_name,
100                                            const std::string& tls,
101                                            uint64_t features, bool test_all)
102 {
103     std::vector<ucp_test_param> result;
104     ucp_params_t tmp_ctx_params = ctx_params;
105 
106     if (features & UCP_FEATURE_RMA) {
107         tmp_ctx_params.features = UCP_FEATURE_RMA;
108         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/rma",
109                                      tls, TEST_RMA, result);
110 
111         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/rma",
112                                      tls, TEST_RMA | UNIFIED_MODE, result);
113     }
114 
115     if (features & UCP_FEATURE_TAG) {
116         tmp_ctx_params.features = UCP_FEATURE_TAG;
117         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/tag",
118                                      tls, TEST_TAG, result);
119 
120         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/tag",
121                                      tls, TEST_TAG | UNIFIED_MODE, result);
122     }
123 
124     if (features & UCP_FEATURE_STREAM) {
125         tmp_ctx_params.features = UCP_FEATURE_STREAM;
126         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/stream",
127                                      tls, TEST_STREAM, result);
128 
129         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/stream",
130                                      tls, TEST_STREAM | UNIFIED_MODE, result);
131     }
132 
133     if (features & (UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64)) {
134         tmp_ctx_params.features = (UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64);
135         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/amo",
136                                      tls, TEST_AMO, result);
137     }
138 
139     if (test_all) {
140         uint64_t all_flags = (TEST_TAG | TEST_RMA | TEST_STREAM);
141         tmp_ctx_params.features = features;
142         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/all",
143                                      tls, all_flags, result);
144 
145         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/all",
146                                      tls, all_flags | UNIFIED_MODE, result);
147     }
148 
149     return result;
150 }
151 
unmap_memh(ucp_mem_h memh,ucp_context_h context)152 void test_ucp_wireup::unmap_memh(ucp_mem_h memh, ucp_context_h context)
153 {
154     ucs_status_t status = ucp_mem_unmap(context, memh);
155     if (status != UCS_OK) {
156         ucs_warn("failed to unmap memory: %s", ucs_status_string(status));
157     }
158 }
159 
init()160 void test_ucp_wireup::init()
161 {
162     if (GetParam().variant & UNIFIED_MODE) {
163         modify_config("UNIFIED_MODE",  "y");
164     }
165 
166     ucp_test::init();
167 
168     m_send_data.resize(BUFFER_LENGTH, 0);
169     m_recv_data.resize(BUFFER_LENGTH, 0);
170 
171     if (GetParam().variant & (TEST_RMA | TEST_AMO)) {
172         ucs_status_t status;
173         ucp_mem_map_params_t params;
174         ucp_mem_h memh;
175 
176         params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
177                             UCP_MEM_MAP_PARAM_FIELD_LENGTH |
178                             UCP_MEM_MAP_PARAM_FIELD_FLAGS;
179         params.address    = &m_recv_data[0];
180         params.length     = m_recv_data.size() * sizeof(m_recv_data[0]);
181         params.flags      = 0;
182 
183         status = ucp_mem_map(sender().ucph(), &params, &memh);
184         ASSERT_UCS_OK(status);
185         m_memh_sender.reset(memh, unmap_memh, sender().ucph());
186 
187         status = ucp_mem_map(receiver().ucph(), &params, &memh);
188         ASSERT_UCS_OK(status);
189         m_memh_receiver.reset(memh, unmap_memh, receiver().ucph());
190     }
191 }
192 
get_rkey(ucp_ep_h ep,ucp_mem_h memh)193 ucp_rkey_h test_ucp_wireup::get_rkey(ucp_ep_h ep, ucp_mem_h memh)
194 {
195     void *rkey_buffer;
196     size_t rkey_size;
197     ucs_status_t status;
198     ucp_rkey_h rkey;
199 
200     if (memh == m_memh_receiver) {
201         status = ucp_rkey_pack(receiver().ucph(), memh, &rkey_buffer, &rkey_size);
202     } else if (memh == m_memh_sender) {
203         status = ucp_rkey_pack(sender().ucph(), memh, &rkey_buffer, &rkey_size);
204     } else {
205         status = UCS_ERR_INVALID_PARAM;
206     }
207     ASSERT_UCS_OK(status);
208 
209     status = ucp_ep_rkey_unpack(ep, rkey_buffer, &rkey);
210     ASSERT_UCS_OK(status);
211 
212     ucp_rkey_buffer_release(rkey_buffer);
213 
214     return rkey;
215 }
216 
rkeys_cleanup()217 void test_ucp_wireup::rkeys_cleanup() {
218     m_rkeys.clear();
219 }
220 
memhs_cleanup()221 void test_ucp_wireup::memhs_cleanup() {
222     m_memh_sender.reset();
223     m_memh_receiver.reset();
224 }
225 
cleanup()226 void test_ucp_wireup::cleanup() {
227     rkeys_cleanup();
228     memhs_cleanup();
229     ucp_test::cleanup();
230 }
231 
clear_recv_data()232 void test_ucp_wireup::clear_recv_data() {
233     std::fill(m_recv_data.begin(), m_recv_data.end(), 0);
234 }
235 
send_nb(ucp_ep_h ep,size_t length,int repeat,std::vector<void * > & reqs,uint64_t send_data)236 void test_ucp_wireup::send_nb(ucp_ep_h ep, size_t length, int repeat,
237                               std::vector<void*>& reqs, uint64_t send_data)
238 {
239     if (GetParam().variant & TEST_TAG) {
240         std::fill(m_send_data.begin(), m_send_data.end(), send_data);
241         for (int i = 0; i < repeat; ++i) {
242             void *req = ucp_tag_send_nb(ep, &m_send_data[0], length,
243                                         DT_U64, TAG, send_completion);
244             if (UCS_PTR_IS_PTR(req)) {
245                 reqs.push_back(req);
246             } else {
247                 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
248             }
249         }
250     } else if (GetParam().variant & TEST_STREAM) {
251         std::fill(m_send_data.begin(), m_send_data.end(), send_data);
252         for (int i = 0; i < repeat; ++i) {
253             void *req = ucp_stream_send_nb(ep, &m_send_data[0], length, DT_U64,
254                                            send_completion, 0);
255             if (UCS_PTR_IS_PTR(req)) {
256                 reqs.push_back(req);
257             } else {
258                 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
259             }
260         }
261     } else if (GetParam().variant & TEST_RMA) {
262         clear_recv_data();
263 
264         ucp_mem_h memh  = (sender().ucph() == ep->worker->context) ?
265                             m_memh_receiver : m_memh_sender;
266         ucp_rkey_h rkey = get_rkey(ep, memh);
267 
268         m_rkeys.push_back(ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy));
269 
270         for (int i = 0; i < repeat; ++i) {
271             std::fill(m_send_data.begin(), m_send_data.end(), send_data + i);
272             void *req = ucp_put_nb(ep, &m_send_data[0],
273                                    m_send_data.size() * sizeof(m_send_data[0]),
274                                    (uintptr_t)&m_recv_data[0], rkey,
275                                    send_completion);
276             if (UCS_PTR_IS_PTR(req)) {
277                 reqs.push_back(req);
278             } else {
279                 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
280             }
281         }
282     }
283 }
284 
send_b(ucp_ep_h ep,size_t length,int repeat,uint64_t send_data)285 void test_ucp_wireup::send_b(ucp_ep_h ep, size_t length, int repeat,
286                              uint64_t send_data)
287 {
288     std::vector<void*> reqs;
289     send_nb(ep, length, repeat, reqs, send_data);
290     waitall(reqs);
291 }
292 
recv_b(ucp_worker_h worker,ucp_ep_h ep,size_t length,int repeat,uint64_t recv_data)293 void test_ucp_wireup::recv_b(ucp_worker_h worker, ucp_ep_h ep, size_t length,
294                              int repeat, uint64_t recv_data)
295 {
296     if (GetParam().variant & (TEST_TAG | TEST_STREAM)) {
297         for (int i = 0; i < repeat; ++i) {
298             size_t recv_length;
299             void *req;
300 
301             clear_recv_data();
302             if (GetParam().variant & TEST_TAG) {
303                 req = ucp_tag_recv_nb(worker, &m_recv_data[0], length, DT_U64,
304                                       TAG, (ucp_tag_t)-1, tag_recv_completion);
305             } else if (GetParam().variant & TEST_STREAM) {
306                 req = ucp_stream_recv_nb(ep, &m_recv_data[0], length, DT_U64,
307                                          stream_recv_completion, &recv_length,
308                                          UCP_STREAM_RECV_FLAG_WAITALL);
309             } else {
310                 req = NULL;
311             }
312             if (UCS_PTR_IS_PTR(req)) {
313                 wait(req);
314             } else {
315                 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
316             }
317             EXPECT_EQ(recv_data, m_recv_data[0])
318                       << "repeat " << i << "/" << repeat;
319             EXPECT_EQ(length,
320                       (size_t)std::count(m_recv_data.begin(),
321                                          m_recv_data.begin() + length,
322                                          recv_data));
323         }
324     } else if (GetParam().variant & TEST_RMA) {
325         for (size_t i = 0; i < length; ++i) {
326             while (m_recv_data[i] != recv_data + repeat - 1) {
327                 progress();
328             }
329         }
330     }
331 }
332 
send_completion(void * request,ucs_status_t status)333 void test_ucp_wireup::send_completion(void *request, ucs_status_t status)
334 {
335 }
336 
tag_recv_completion(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)337 void test_ucp_wireup::tag_recv_completion(void *request, ucs_status_t status,
338                                           ucp_tag_recv_info_t *info)
339 {
340 }
341 
stream_recv_completion(void * request,ucs_status_t status,size_t length)342 void test_ucp_wireup::stream_recv_completion(void *request, ucs_status_t status,
343                                              size_t length)
344 {
345 }
346 
send_recv(ucp_ep_h send_ep,ucp_worker_h recv_worker,ucp_ep_h recv_ep,size_t length,int repeat)347 void test_ucp_wireup::send_recv(ucp_ep_h send_ep, ucp_worker_h recv_worker,
348                                 ucp_ep_h recv_ep, size_t length, int repeat)
349 {
350     std::vector<void*> send_reqs;
351     static uint64_t next_send_data = 0;
352     uint64_t send_data = next_send_data++;
353 
354     send_nb(send_ep, length, repeat, send_reqs, send_data);
355     recv_b (recv_worker, recv_ep, length, repeat, send_data);
356     waitall(send_reqs);
357     m_rkeys.clear();
358 }
359 
disconnect(ucp_ep_h ep)360 void test_ucp_wireup::disconnect(ucp_ep_h ep) {
361     void *req = ucp_disconnect_nb(ep);
362     if (!UCS_PTR_IS_PTR(req)) {
363         ASSERT_UCS_OK(UCS_PTR_STATUS(req));
364     }
365     wait(req);
366 }
367 
disconnect(ucp_test::entity & e)368 void test_ucp_wireup::disconnect(ucp_test::entity &e) {
369     disconnect(e.revoke_ep());
370 }
371 
waitall(std::vector<void * > reqs)372 void test_ucp_wireup::waitall(std::vector<void*> reqs)
373 {
374     while (!reqs.empty()) {
375         wait(reqs.back());
376         reqs.pop_back();
377     }
378 }
379 
380 class test_ucp_wireup_1sided : public test_ucp_wireup {
381 public:
382     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)383     enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
384                      const std::string& test_case_name, const std::string& tls)
385     {
386         return enum_test_params_features(ctx_params, name, test_case_name, tls,
387                                          UCP_FEATURE_RMA | UCP_FEATURE_TAG);
388     }
389 
test_ucp_wireup_1sided()390     test_ucp_wireup_1sided() {
391         for (ucp_lane_index_t i = 0; i < UCP_MAX_LANES; ++i) {
392             m_lanes2remote[i] = i;
393         }
394     }
395 
396     ucp_lane_index_t m_lanes2remote[UCP_MAX_LANES];
397 };
398 
UCS_TEST_P(test_ucp_wireup_1sided,address)399 UCS_TEST_P(test_ucp_wireup_1sided, address) {
400     ucs_status_t status;
401     size_t size;
402     void *buffer;
403     std::set<uint8_t> packed_dev_priorities, unpacked_dev_priorities;
404     ucp_rsc_index_t tl;
405 
406     status = ucp_address_pack(sender().worker(), NULL,
407                               std::numeric_limits<uint64_t>::max(),
408                               UCP_ADDRESS_PACK_FLAGS_ALL, m_lanes2remote, &size,
409                               &buffer);
410     ASSERT_UCS_OK(status);
411     ASSERT_TRUE(buffer != NULL);
412     ASSERT_GT(size, 0ul);
413     EXPECT_LE(size, 2048ul); /* Expect a reasonable address size */
414 
415     ucs_for_each_bit(tl, sender().worker()->context->tl_bitmap) {
416         if (sender().worker()->context->tl_rscs[tl].flags & UCP_TL_RSC_FLAG_SOCKADDR) {
417             continue;
418         }
419         packed_dev_priorities.insert(ucp_worker_iface_get_attr(sender().worker(), tl)->priority);
420     }
421 
422     ucp_unpacked_address unpacked_address;
423 
424     status = ucp_address_unpack(sender().worker(), buffer,
425                                 UCP_ADDRESS_PACK_FLAGS_ALL, &unpacked_address);
426     ASSERT_UCS_OK(status);
427 
428     EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
429 #if ENABLE_DEBUG_DATA
430     EXPECT_EQ(std::string(ucp_worker_get_name(sender().worker())),
431               std::string(unpacked_address.name));
432 #endif
433     EXPECT_LE(unpacked_address.address_count,
434               static_cast<unsigned>(sender().ucph()->num_tls));
435 
436     const ucp_address_entry_t *ae;
437     ucp_unpacked_address_for_each(ae, &unpacked_address) {
438         unpacked_dev_priorities.insert(ae->iface_attr.priority);
439     }
440 
441     /* TODO test addresses */
442 
443     ucs_free(unpacked_address.address_list);
444     ucs_free(buffer);
445     /* Make sure that the packed device priorities are equal to the unpacked
446      * device priorities */
447     ASSERT_TRUE(packed_dev_priorities == unpacked_dev_priorities);
448 }
449 
450 UCS_TEST_P(test_ucp_wireup_1sided, ep_address, "IB_NUM_PATHS?=2") {
451     ucs_status_t status;
452     size_t size;
453     void *buffer;
454 
455     sender().connect(&receiver(), get_ep_params());
456 
457     status = ucp_address_pack(sender().worker(), sender().ep(),
458                               std::numeric_limits<uint64_t>::max(),
459                               UCP_ADDRESS_PACK_FLAGS_ALL, m_lanes2remote, &size,
460                               &buffer);
461     ASSERT_UCS_OK(status);
462     ASSERT_TRUE(buffer != NULL);
463 
464     ucp_unpacked_address unpacked_address;
465 
466     status = ucp_address_unpack(sender().worker(), buffer,
467                                 UCP_ADDRESS_PACK_FLAGS_ALL, &unpacked_address);
468     ASSERT_UCS_OK(status);
469 
470     EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
471     EXPECT_LE(unpacked_address.address_count,
472               static_cast<unsigned>(sender().ucph()->num_tls));
473 
474     ucs_free(unpacked_address.address_list);
475     ucs_free(buffer);
476 }
477 
UCS_TEST_P(test_ucp_wireup_1sided,empty_address)478 UCS_TEST_P(test_ucp_wireup_1sided, empty_address) {
479     ucs_status_t status;
480     size_t size;
481     void *buffer;
482 
483     status = ucp_address_pack(sender().worker(), NULL, 0,
484                               UCP_ADDRESS_PACK_FLAGS_ALL, m_lanes2remote, &size,
485                               &buffer);
486     ASSERT_UCS_OK(status);
487     ASSERT_TRUE(buffer != NULL);
488     ASSERT_GT(size, 0ul);
489 
490     ucp_unpacked_address unpacked_address;
491 
492     status = ucp_address_unpack(sender().worker(), buffer,
493                                 UCP_ADDRESS_PACK_FLAGS_ALL, &unpacked_address);
494     ASSERT_UCS_OK(status);
495 
496     EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
497 #if ENABLE_DEBUG_DATA
498     EXPECT_EQ(std::string(ucp_worker_get_name(sender().worker())),
499               std::string(unpacked_address.name));
500 #endif
501     EXPECT_EQ(0u, unpacked_address.address_count);
502 
503     ucs_free(unpacked_address.address_list);
504     ucs_free(buffer);
505 }
506 
UCS_TEST_P(test_ucp_wireup_1sided,one_sided_wireup)507 UCS_TEST_P(test_ucp_wireup_1sided, one_sided_wireup) {
508     sender().connect(&receiver(), get_ep_params());
509     send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
510     flush_worker(sender());
511 }
512 
513 UCS_TEST_P(test_ucp_wireup_1sided, one_sided_wireup_rndv, "RNDV_THRESH=1") {
514     sender().connect(&receiver(), get_ep_params());
515     send_recv(sender().ep(), receiver().worker(), receiver().ep(), BUFFER_LENGTH, 1);
516     if (is_loopback() && (GetParam().variant & TEST_TAG)) {
517         /* expect the endpoint to be connected to itself */
518         ucp_ep_h ep = sender().ep();
519         EXPECT_EQ((uintptr_t)ep, ucp_ep_dest_ep_ptr(ep));
520     }
521     flush_worker(sender());
522 }
523 
UCS_TEST_P(test_ucp_wireup_1sided,multi_wireup)524 UCS_TEST_P(test_ucp_wireup_1sided, multi_wireup) {
525     skip_loopback();
526 
527     const size_t count = 10;
528     while (entities().size() < count) {
529         create_entity();
530     }
531 
532     /* connect from sender() to all the rest */
533     for (size_t i = 0; i < count; ++i) {
534         sender().connect(&entities().at(i), get_ep_params(), i);
535     }
536 }
537 
UCS_TEST_P(test_ucp_wireup_1sided,stress_connect)538 UCS_TEST_P(test_ucp_wireup_1sided, stress_connect) {
539     for (int i = 0; i < 30; ++i) {
540         sender().connect(&receiver(), get_ep_params());
541         send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1,
542                   10000 / (ucs::test_time_multiplier() *
543                            ucs::test_time_multiplier()));
544         if (!is_loopback()) {
545             receiver().connect(&sender(), get_ep_params());
546         }
547 
548         disconnect(sender());
549         if (!is_loopback()) {
550             disconnect(receiver());
551         }
552     }
553 }
554 
UCS_TEST_P(test_ucp_wireup_1sided,stress_connect2)555 UCS_TEST_P(test_ucp_wireup_1sided, stress_connect2) {
556     int max_count = (int)ucs_max(10,
557                                  (1000.0 / (ucs::test_time_multiplier() *
558                                             ucs::test_time_multiplier())));
559     int count     = ucs_min(max_count, max_connections() / 2);
560 
561     for (int i = 0; i < count; ++i) {
562         sender().connect(&receiver(), get_ep_params());
563         send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
564         if (!is_loopback()) {
565             receiver().connect(&sender(), get_ep_params());
566         }
567 
568         disconnect(sender());
569         if (!is_loopback()) {
570             disconnect(receiver());
571         }
572     }
573 }
574 
UCS_TEST_P(test_ucp_wireup_1sided,disconnect_nonexistent)575 UCS_TEST_P(test_ucp_wireup_1sided, disconnect_nonexistent) {
576     skip_loopback();
577     sender().connect(&receiver(), get_ep_params());
578     disconnect(sender());
579     receiver().destroy_worker();
580     sender().destroy_worker();
581 }
582 
UCS_TEST_P(test_ucp_wireup_1sided,disconnect_reconnect)583 UCS_TEST_P(test_ucp_wireup_1sided, disconnect_reconnect) {
584     sender().connect(&receiver(), get_ep_params());
585     send_b(sender().ep(), 1000, 1);
586     disconnect(sender());
587     recv_b(receiver().worker(), receiver().ep(), 1000, 1);
588 
589     sender().connect(&receiver(), get_ep_params());
590     send_b(sender().ep(), 1000, 1);
591     disconnect(sender());
592     recv_b(receiver().worker(), receiver().ep(), 1000, 1);
593 }
594 
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_onesided)595 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided) {
596     sender().connect(&receiver(), get_ep_params());
597     send_b(sender().ep(), 1000, 100);
598     disconnect(sender());
599     recv_b(receiver().worker(), receiver().ep(), 1000, 100);
600 }
601 
602 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided_nozcopy, "ZCOPY_THRESH=-1") {
603     sender().connect(&receiver(), get_ep_params());
604     send_b(sender().ep(), 1000, 100);
605     disconnect(sender());
606     recv_b(receiver().worker(), receiver().ep(), 1000, 100);
607 }
608 
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_onesided_wait)609 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided_wait) {
610     sender().connect(&receiver(), get_ep_params());
611     send_recv(sender().ep(), receiver().worker(), receiver().ep(), 8, 1);
612     send_b(sender().ep(), 1000, 200);
613     disconnect(sender());
614     recv_b(receiver().worker(), receiver().ep(), 1000, 200);
615 }
616 
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_reply1)617 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_reply1) {
618     sender().connect(&receiver(), get_ep_params());
619     if (!is_loopback()) {
620         receiver().connect(&sender(), get_ep_params());
621     }
622 
623     send_b(sender().ep(), 8, 1);
624     if (!is_loopback()) {
625         disconnect(sender());
626     }
627 
628     recv_b(receiver().worker(), receiver().ep(), 8, 1);
629     send_b(receiver().ep(), 8, 1);
630     disconnect(receiver());
631     recv_b(sender().worker(), sender().ep(), 8, 1);
632 }
633 
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_reply2)634 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_reply2) {
635     sender().connect(&receiver(), get_ep_params());
636 
637     send_b(sender().ep(), 8, 1);
638     if (!is_loopback()) {
639         disconnect(sender());
640     }
641     recv_b(receiver().worker(), receiver().ep(),  8, 1);
642 
643     if (!is_loopback()) {
644         receiver().connect(&sender(), get_ep_params());
645     }
646 
647     send_b(receiver().ep(), 8, 1);
648     disconnect(receiver());
649     recv_b(sender().worker(), receiver().ep(), 8, 1);
650 }
651 
UCS_TEST_P(test_ucp_wireup_1sided,disconnect_nb_onesided)652 UCS_TEST_P(test_ucp_wireup_1sided, disconnect_nb_onesided) {
653     sender().connect(&receiver(), get_ep_params());
654 
655     std::vector<void*> sreqs;
656     send_nb(sender().ep(), 1000, 1000, sreqs);
657 
658     void *req = sender().disconnect_nb();
659     ucs_time_t deadline = ucs::get_deadline();
660     while (!is_request_completed(req) && (ucs_get_time() < deadline)) {
661         progress();
662     }
663 
664     sender().close_ep_req_free(req);
665 
666     recv_b(receiver().worker(), receiver().ep(), 1000, 1000);
667     waitall(sreqs);
668 }
669 
UCS_TEST_P(test_ucp_wireup_1sided,multi_ep_1sided)670 UCS_TEST_P(test_ucp_wireup_1sided, multi_ep_1sided) {
671     const unsigned count = 10;
672 
673     for (unsigned i = 0; i < count; ++i) {
674         sender().connect(&receiver(), get_ep_params(), i);
675     }
676 
677     for (unsigned i = 0; i < count; ++i) {
678         send_recv(sender().ep(0, i), receiver().worker(), receiver().ep(), 8, 1);
679     }
680 }
681 
682 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_1sided)
683 
684 class test_ucp_wireup_2sided : public test_ucp_wireup {
685 public:
686     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)687     enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
688                      const std::string& test_case_name, const std::string& tls)
689     {
690         return enum_test_params_features(ctx_params, name, test_case_name, tls,
691                                          UCP_FEATURE_RMA | UCP_FEATURE_TAG |
692                                          UCP_FEATURE_STREAM);
693     }
694 
695 protected:
696     void test_connect_loopback(bool delay_before_connect, bool enable_loopback);
697 };
698 
UCS_TEST_P(test_ucp_wireup_2sided,two_sided_wireup)699 UCS_TEST_P(test_ucp_wireup_2sided, two_sided_wireup) {
700     sender().connect(&receiver(), get_ep_params());
701     if (!is_loopback()) {
702         receiver().connect(&sender(), get_ep_params());
703     }
704 
705     send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
706     flush_worker(sender());
707     send_recv(receiver().ep(), sender().worker(), sender().ep(), 1, 1);
708     flush_worker(receiver());
709 }
710 
test_connect_loopback(bool delay_before_connect,bool enable_loopback)711 void test_ucp_wireup_2sided::test_connect_loopback(bool delay_before_connect,
712                                                    bool enable_loopback) {
713     ucp_ep_params_t params = test_ucp_wireup::get_ep_params();
714     if (!enable_loopback) {
715         params.field_mask |= UCP_EP_PARAM_FIELD_FLAGS;
716         params.flags      |= UCP_EP_PARAMS_FLAGS_NO_LOOPBACK;
717     }
718 
719     for (int i = 0; i < 5; ++i) {
720         int base_index = i * 2;
721         sender().connect(&sender(), params, base_index);
722         ucp_ep_h ep1 = sender().ep(0, base_index);
723 
724         if (delay_before_connect) {
725             /* let one side create ep */
726             short_progress_loop(0);
727         }
728 
729         sender().connect(&sender(), params, base_index + 1);
730         ucp_ep_h ep2 = sender().ep(0, base_index + 1);
731 
732         EXPECT_NE(ep1, ep2);
733 
734         if (GetParam().variant & TEST_STREAM) {
735             uint64_t data1 = (base_index * 10) + 1;
736             uint64_t data2 = (base_index * 10) + 2;
737 
738             send_b(ep1, 1, 1, data1);
739             send_b(ep2, 1, 1, data2);
740 
741             if (enable_loopback) {
742                 /* self-send - each ep receives what was sent on it */
743                 recv_b(sender().worker(), ep1, 1, 1, data1);
744                 recv_b(sender().worker(), ep2, 1, 1, data2);
745             } else {
746                 /* cross-send - each ep receives what was sent on the other ep */
747                 recv_b(sender().worker(), ep1, 1, 1, data2);
748                 recv_b(sender().worker(), ep2, 1, 1, data1);
749             }
750         }
751     }
752     flush_worker(sender());
753 }
754 
UCS_TEST_P(test_ucp_wireup_2sided,loopback)755 UCS_TEST_P(test_ucp_wireup_2sided, loopback) {
756     test_connect_loopback(false, true);
757 }
758 
UCS_TEST_P(test_ucp_wireup_2sided,loopback_with_delay)759 UCS_TEST_P(test_ucp_wireup_2sided, loopback_with_delay) {
760     test_connect_loopback(true, true);
761 }
762 
UCS_TEST_P(test_ucp_wireup_2sided,no_loopback)763 UCS_TEST_P(test_ucp_wireup_2sided, no_loopback) {
764     test_connect_loopback(false, false);
765 }
766 
UCS_TEST_P(test_ucp_wireup_2sided,no_loopback_with_delay)767 UCS_TEST_P(test_ucp_wireup_2sided, no_loopback_with_delay) {
768     test_connect_loopback(true, false);
769 }
770 
771 UCS_TEST_SKIP_COND_P(test_ucp_wireup_2sided, async_connect,
772                      !(GetParam().ctx_params.features & UCP_FEATURE_TAG)) {
773     sender().connect(&receiver(), get_ep_params());
774     ucp_ep_h send_ep = sender().ep();
775     std::vector<void *> reqs;
776 
777     reqs.push_back(ucp_tag_send_nb(send_ep, NULL, 0, DT_U64, 1, send_completion));
778     EXPECT_FALSE(UCS_PTR_IS_ERR(reqs.back()));
779 
780     ucs_time_t deadline = ucs::get_deadline();
781     /* waiting of async reply on wiriup without calling progress on receiver */
782     while(!(send_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED) &&
783           (ucs_get_time() < deadline)) {
784         ucp_worker_progress(sender().worker());
785         ucp_worker_progress(receiver().worker());
786     }
787 
788     reqs.push_back(ucp_tag_recv_nb(receiver().worker(), NULL, 0, DT_U64, 1,
789                                    (ucp_tag_t)-1, tag_recv_completion));
790     EXPECT_FALSE(UCS_PTR_IS_ERR(reqs.back()));
791     waitall(reqs);
792 }
793 
UCS_TEST_P(test_ucp_wireup_2sided,connect_disconnect)794 UCS_TEST_P(test_ucp_wireup_2sided, connect_disconnect) {
795     sender().connect(&receiver(), get_ep_params());
796     if (!is_loopback()) {
797         receiver().connect(&sender(), get_ep_params());
798     }
799     disconnect(sender());
800     if (!is_loopback()) {
801         disconnect(receiver());
802     }
803 }
804 
UCS_TEST_P(test_ucp_wireup_2sided,multi_ep_2sided)805 UCS_TEST_P(test_ucp_wireup_2sided, multi_ep_2sided) {
806     const unsigned count = 10;
807 
808     for (unsigned j = 0; j < 4; ++j) {
809 
810         unsigned offset = j * count;
811 
812         for (unsigned i = 0; i < count; ++i) {
813             unsigned ep_idx = offset + i;
814             sender().connect(&receiver(), get_ep_params(), ep_idx);
815             if (!is_loopback()) {
816                 receiver().connect(&sender(), get_ep_params(), ep_idx);
817             }
818             UCS_TEST_MESSAGE << "iteration " << j << " pair " << i << ": " <<
819                             sender().ep(0, ep_idx) << " <--> " << receiver().ep(0, ep_idx);
820         }
821 
822         for (unsigned i = 0; i < count; ++i) {
823             unsigned ep_idx = offset + i;
824             send_recv(sender().ep(0, ep_idx), receiver().worker(),
825                       receiver().ep(0, ep_idx), 8, 1);
826             send_recv(receiver().ep(0, ep_idx), sender().worker(),
827                       sender().ep(0, ep_idx), 8, 1);
828         }
829 
830         short_progress_loop(0);
831     }
832 }
833 
834 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_2sided)
835 
836 class test_ucp_wireup_errh_peer : public test_ucp_wireup_1sided
837 {
838 public:
get_ep_params()839     virtual ucp_ep_params_t get_ep_params() {
840         ucp_ep_params_t params = test_ucp_wireup::get_ep_params();
841         params.field_mask     |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
842                                  UCP_EP_PARAM_FIELD_ERR_HANDLER;
843         params.err_mode        = UCP_ERR_HANDLING_MODE_PEER;
844         params.err_handler.cb  = err_cb;
845         params.err_handler.arg = NULL;
846         return params;
847     }
848 
init()849     virtual void init() {
850         test_ucp_wireup::init();
851         skip_loopback();
852     }
853 
err_cb(void *,ucp_ep_h,ucs_status_t)854     static void err_cb(void *, ucp_ep_h, ucs_status_t) {}
855 };
856 
UCS_TEST_P(test_ucp_wireup_errh_peer,msg_after_ep_create)857 UCS_TEST_P(test_ucp_wireup_errh_peer, msg_after_ep_create) {
858     receiver().connect(&sender(), get_ep_params());
859 
860     sender().connect(&receiver(), get_ep_params());
861     send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
862     flush_worker(sender());
863 }
864 
UCS_TEST_P(test_ucp_wireup_errh_peer,msg_before_ep_create)865 UCS_TEST_P(test_ucp_wireup_errh_peer, msg_before_ep_create) {
866 
867     sender().connect(&receiver(), get_ep_params());
868     send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
869     flush_worker(sender());
870 
871     receiver().connect(&sender(), get_ep_params());
872 
873     send_recv(receiver().ep(), sender().worker(), receiver().ep(), 1, 1);
874     flush_worker(receiver());
875 }
876 
877 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_errh_peer)
878 
879 class test_ucp_wireup_fallback : public test_ucp_wireup {
880 public:
test_ucp_wireup_fallback()881     test_ucp_wireup_fallback() {
882         m_num_lanes = 0;
883     }
884 
885     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)886     enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
887                      const std::string& test_case_name, const std::string& tls)
888     {
889         return enum_test_params_features(ctx_params, name, test_case_name, tls,
890                                          UCP_FEATURE_TAG | UCP_FEATURE_RMA |
891                                          UCP_FEATURE_STREAM, 1);
892     }
893 
init()894     void init() {
895         /* do nothing */
896     }
897 
cleanup()898     void cleanup() {
899         /* do nothing */
900     }
901 
check_scalable_tls(const ucp_worker_h worker,size_t est_num_eps)902     bool check_scalable_tls(const ucp_worker_h worker, size_t est_num_eps) {
903         ucp_rsc_index_t rsc_index;
904 
905         ucs_for_each_bit(rsc_index, worker->context->tl_bitmap) {
906             ucp_md_index_t md_index      = worker->context->tl_rscs[rsc_index].md_index;
907             const uct_md_attr_t *md_attr = &worker->context->tl_mds[md_index].attr;
908 
909             if ((worker->context->tl_rscs[rsc_index].flags & UCP_TL_RSC_FLAG_AUX) ||
910                 (md_attr->cap.flags & UCT_MD_FLAG_SOCKADDR) ||
911                 (worker->context->tl_rscs[rsc_index].tl_rsc.dev_type == UCT_DEVICE_TYPE_ACC)) {
912                 // Skip TLs for wireup and CM and acceleration TLs
913                 continue;
914             }
915 
916             if (ucp_worker_iface_get_attr(worker, rsc_index)->max_num_eps >= est_num_eps) {
917                 EXPECT_TRUE((worker->scalable_tl_bitmap & UCS_BIT(rsc_index)) != 0);
918                 return true;
919             } else {
920                 EXPECT_TRUE((worker->scalable_tl_bitmap & UCS_BIT(rsc_index)) == 0);
921             }
922         }
923 
924         return false;
925     }
926 
test_est_num_eps_fallback(size_t est_num_eps,unsigned long & min_max_num_eps)927     bool test_est_num_eps_fallback(size_t est_num_eps,
928                                    unsigned long &min_max_num_eps) {
929         size_t num_lanes = 0;
930         bool res         = true;
931         bool has_only_unscalable;
932 
933         min_max_num_eps = UCS_ULUNITS_INF;
934 
935         UCS_TEST_MESSAGE << "Testing " << est_num_eps << " number of EPs";
936         modify_config("NUM_EPS", ucs::to_string(est_num_eps).c_str());
937         test_ucp_wireup::init();
938 
939         sender().connect(&receiver(), get_ep_params());
940         if (!is_loopback()) {
941             receiver().connect(&sender(), get_ep_params());
942         }
943         send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
944         flush_worker(sender());
945 
946         has_only_unscalable = !check_scalable_tls(sender().worker(),
947                                                   est_num_eps);
948 
949         for (ucp_lane_index_t lane = 0;
950              lane < ucp_ep_num_lanes(sender().ep()); lane++) {
951             uct_ep_h uct_ep = sender().ep()->uct_eps[lane];
952             if (uct_ep == NULL) {
953                 continue;
954             }
955 
956             uct_iface_attr_t iface_attr;
957             ucs_status_t status = uct_iface_query(uct_ep->iface, &iface_attr);
958             ASSERT_UCS_OK(status);
959 
960             num_lanes++;
961 
962             if (!has_only_unscalable && (iface_attr.max_num_eps < est_num_eps)) {
963                 res = false;
964                 goto out;
965             }
966 
967             if (iface_attr.max_num_eps < min_max_num_eps) {
968                 min_max_num_eps = iface_attr.max_num_eps;
969             }
970         }
971 
972 out:
973         test_ucp_wireup::cleanup();
974 
975         if (est_num_eps == 1) {
976             m_num_lanes = num_lanes;
977         } else if (has_only_unscalable) {
978             /* If has only unscalable transports, check that the number of
979              * lanes is the same as for the case when "est_num_eps == 1" */
980             res = (num_lanes == m_num_lanes);
981         }
982 
983         return res;
984     }
985 
986 private:
987 
988     /* The number of lanes activated for the case when "est_num_eps == 1" */
989     size_t m_num_lanes;
990 };
991 
UCS_TEST_P(test_ucp_wireup_fallback,est_num_eps_fallback)992 UCS_TEST_P(test_ucp_wireup_fallback, est_num_eps_fallback) {
993     unsigned long test_min_max_eps, min_max_eps;
994 
995     test_est_num_eps_fallback(1, test_min_max_eps);
996 
997     size_t prev_min_max_eps = 0;
998     while ((test_min_max_eps != UCS_ULUNITS_INF) &&
999            /* number of EPs was changed between iterations */
1000            (test_min_max_eps != prev_min_max_eps)) {
1001         if (test_min_max_eps > 1) {
1002             EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps - 1,
1003                                                   min_max_eps));
1004         }
1005 
1006         EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps,
1007                                               min_max_eps));
1008 
1009         EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps + 1,
1010                                               min_max_eps));
1011         prev_min_max_eps = test_min_max_eps;
1012         test_min_max_eps = min_max_eps;
1013     }
1014 }
1015 
1016 /* Test fallback from RC to UD, since RC isn't scalable enough
1017  * as its iface max_num_eps attribute = 256 by default */
1018 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1019                               rc_ud, "rc_x,rc_v,ud_x,ud_v")
1020 /* Test fallback selection of UD only TLs, since TCP shouldn't
1021  * be used for any lanes */
1022 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1023                               ud_tcp, "ud_x,ud_v,tcp")
1024 /* Test two scalable enough transports */
1025 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1026                               dc_ud, "dc_x,ud_x,ud_v")
1027 /* Test unsacalable transports only */
1028 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1029                               rc, "rc_x,rc_v")
1030 /* Test all available IB transports */
1031 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1032                               ib, "ib")
1033 /* Test on TCP only */
1034 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1035                               tcp, "tcp")
1036 
1037 class test_ucp_wireup_unified : public test_ucp_wireup {
1038 public:
1039     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)1040     enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
1041                      const std::string& test_case_name, const std::string& tls)
1042     {
1043         std::vector<ucp_test_param> result;
1044         ucp_params_t tmp_ctx_params = ctx_params;
1045 
1046         tmp_ctx_params.features = UCP_FEATURE_TAG;
1047 
1048         generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/uni",
1049                                      tls, TEST_TAG | UNIFIED_MODE, result);
1050         return result;
1051     }
1052 
context_has_tls(ucp_context_h ctx,const std::string & tl,ucp_rsc_index_t md_idx)1053     bool context_has_tls(ucp_context_h ctx, const std::string& tl,
1054                          ucp_rsc_index_t md_idx)
1055     {
1056         for (ucp_rsc_index_t idx = 0; idx < ctx->num_tls; ++idx) {
1057             if (ctx->tl_rscs[idx].md_index != md_idx) {
1058                 continue;
1059             }
1060 
1061             if (!strcmp(ctx->tl_rscs[idx].tl_rsc.tl_name, tl.c_str())) {
1062                 return true;
1063             }
1064         }
1065 
1066         return false;
1067     }
1068 
worker_has_tls(ucp_worker_h worker,const std::string & tl,ucp_rsc_index_t md_idx)1069     bool worker_has_tls(ucp_worker_h worker, const std::string& tl,
1070                         ucp_rsc_index_t md_idx)
1071     {
1072         ucp_context_h ctx = worker->context;
1073 
1074         for (unsigned i = 0; i < worker->num_ifaces; ++i) {
1075             ucp_worker_iface_t *wiface = worker->ifaces[i];
1076             ucp_rsc_index_t md_idx_it  = ctx->tl_rscs[wiface->rsc_index].md_index;
1077 
1078             if (md_idx_it != md_idx) {
1079                 continue;
1080             }
1081 
1082             char* name = ctx->tl_rscs[wiface->rsc_index].tl_rsc.tl_name;
1083             if (!strcmp(name, tl.c_str())) {
1084                 return true;
1085             }
1086         }
1087         return false;
1088     }
1089 
check_unified_ifaces(entity * e,const std::string & better_tl,const std::string & tl)1090     void check_unified_ifaces(entity *e,
1091                               const std::string& better_tl,
1092                               const std::string& tl)
1093     {
1094         ucp_context_h ctx   = e->ucph();
1095         ucp_worker_h worker = e->worker();
1096 
1097         for (ucp_rsc_index_t i = 0; i < ctx->num_mds; ++i) {
1098             if (!(context_has_tls(ctx, better_tl, i) &&
1099                   context_has_tls(ctx, tl, i))) {
1100                continue;
1101             }
1102 
1103             ASSERT_TRUE(ctx->num_tls > worker->num_ifaces);
1104             EXPECT_TRUE(worker_has_tls(worker, better_tl, i));
1105             EXPECT_FALSE(worker_has_tls(worker, tl, i));
1106         }
1107     }
1108 };
1109 
1110 
UCS_TEST_P(test_ucp_wireup_unified,select_best_ifaces)1111 UCS_TEST_P(test_ucp_wireup_unified, select_best_ifaces)
1112 {
1113     // Accelerated transports have better performance charasteristics than their
1114     // verbs counterparts. Check that corresponding verbs transports are not used
1115     // by workers in unified mode.
1116     check_unified_ifaces(&sender(), "rc_mlx5", "rc_verbs");
1117     check_unified_ifaces(&sender(), "ud_mlx5", "ud_verbs");
1118 
1119     // RC and DC has similar capabilities, but RC has better latency while
1120     // estimated number of endpoints is relatively small.
1121     // sender() is created with 1 ep, so RC should be selected over DC.
1122     check_unified_ifaces(&sender(), "rc_mlx5", "dc_mlx5");
1123 
1124     // Set some big enough number of endpoints for DC to be more performance
1125     // efficient than RC. Now check that DC is selected over RC.
1126     modify_config("NUM_EPS", "1000");
1127     entity *e = create_entity();
1128     check_unified_ifaces(e, "dc_mlx5", "rc_mlx5");
1129 }
1130 
1131 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, rc, "rc")
1132 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, ud, "ud")
1133 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, rc_dc, "rc,dc")
1134 
1135 class test_ucp_wireup_fallback_amo : public test_ucp_wireup {
init()1136     void init() {
1137         size_t device_atomics_cnt = 0;
1138 
1139         test_ucp_wireup::init();
1140 
1141         for (ucp_rsc_index_t idx = 0; idx < sender().ucph()->num_tls; ++idx) {
1142             uct_iface_attr_t *attr = ucp_worker_iface_get_attr(sender().worker(),
1143                                                                idx);
1144             if (attr->cap.flags & UCT_IFACE_FLAG_ATOMIC_DEVICE) {
1145                 device_atomics_cnt++;
1146             }
1147         }
1148         bool device_atomics_supported = sender().worker()->atomic_tls != 0;
1149 
1150         test_ucp_wireup::cleanup();
1151 
1152         if (!device_atomics_supported || !device_atomics_cnt) {
1153             UCS_TEST_SKIP_R("there are no TLs that support device atomics");
1154         }
1155     }
1156 
cleanup()1157     void cleanup() {
1158         /* do nothing */
1159     }
1160 
1161 protected:
1162 
use_device_amo(ucp_ep_h ep)1163     bool use_device_amo(ucp_ep_h ep) {
1164         ucp_ep_config_t *ep_config = ucp_ep_config(ep);
1165 
1166         for (ucp_lane_index_t lane = 0; lane < UCP_MAX_LANES; ++lane) {
1167             if (ep_config->key.amo_lanes[lane] != UCP_NULL_LANE) {
1168                 return (ucp_ep_get_iface_attr(ep, lane)->cap.flags &
1169                         UCT_IFACE_FLAG_ATOMIC_DEVICE);
1170             }
1171         }
1172 
1173         return false;
1174     }
1175 
get_min_max_num_eps(ucp_ep_h ep)1176     size_t get_min_max_num_eps(ucp_ep_h ep) {
1177         unsigned long min_max_num_eps = UCS_ULUNITS_INF;
1178 
1179         for (ucp_lane_index_t lane = 0; lane < ucp_ep_num_lanes(ep); lane++) {
1180             uct_iface_attr_t *iface_attr = ucp_ep_get_iface_attr(ep, lane);
1181 
1182             if (iface_attr->max_num_eps < min_max_num_eps) {
1183                 min_max_num_eps = iface_attr->max_num_eps;
1184             }
1185         }
1186 
1187         return min_max_num_eps;
1188     }
1189 
test_wireup_fallback_amo(const std::vector<std::string> & tls,size_t est_num_eps,bool should_use_device_amo)1190     size_t test_wireup_fallback_amo(const std::vector<std::string> &tls,
1191                                     size_t est_num_eps, bool should_use_device_amo) {
1192         unsigned long min_max_num_eps = UCS_ULUNITS_INF;
1193 
1194         UCS_TEST_MESSAGE << "Testing " << est_num_eps << " number of EPs";
1195         modify_config("NUM_EPS", ucs::to_string(est_num_eps).c_str());
1196 
1197         // Create new entity and add to to the end of vector
1198         // (thus it will be receiver without any connections)
1199         create_entity(false);
1200 
1201         ucp_test_param params = GetParam();
1202         for (std::vector<std::string>::const_iterator i = tls.begin();
1203              i != tls.end(); ++i) {
1204             params.transports.clear();
1205             params.transports.push_back(*i);
1206             create_entity(true, params);
1207             sender().connect(&receiver(), get_ep_params());
1208 
1209             EXPECT_EQ(should_use_device_amo, use_device_amo(sender().ep()));
1210 
1211             size_t max_num_eps = get_min_max_num_eps(sender().ep());
1212             if (max_num_eps < min_max_num_eps) {
1213                 min_max_num_eps = max_num_eps;
1214             }
1215         }
1216 
1217         test_ucp_wireup::cleanup();
1218 
1219         return min_max_num_eps;
1220     }
1221 
1222 public:
1223 
get_ctx_params()1224     static ucp_params_t get_ctx_params() {
1225         ucp_params_t params = test_ucp_wireup::get_ctx_params();
1226         params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
1227         params.features    |= (UCP_FEATURE_AMO32 |
1228                                UCP_FEATURE_AMO64);
1229         return params;
1230     }
1231 };
1232 
1233 class test_ucp_wireup_amo : public test_ucp_wireup {
1234 public:
1235     typedef struct {
1236         test_ucp_wireup_amo *test;
1237     } request_t;
1238 
get_ctx_params()1239     static ucp_params_t get_ctx_params() {
1240         ucp_params_t params = test_ucp_wireup::get_ctx_params();
1241         params.field_mask  |= UCP_PARAM_FIELD_REQUEST_SIZE;
1242         params.request_size = sizeof(request_t);
1243         return params;
1244     }
1245 
1246     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)1247     enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
1248                      const std::string& test_case_name, const std::string& tls)
1249     {
1250         uint64_t amo_features;
1251 
1252         EXPECT_TRUE((sizeof(elem_type) == 4ul) || (sizeof(elem_type) == 8ul));
1253         amo_features = (sizeof(elem_type) == 4ul) ? UCP_FEATURE_AMO32 :
1254                        UCP_FEATURE_AMO64;
1255         return enum_test_params_features(ctx_params, name, test_case_name, tls,
1256                                          amo_features, false);
1257     }
1258 
1259 protected:
get_rkey(const entity & e)1260     ucp_rkey_h get_rkey(const entity &e) {
1261         if (&sender() == &e) {
1262             return test_ucp_wireup::get_rkey(e.ep(), m_memh_receiver);
1263         } else if (&receiver() == &e) {
1264             return test_ucp_wireup::get_rkey(e.ep(), m_memh_sender);
1265         }
1266 
1267         return NULL;
1268     }
1269 
add_rkey(ucp_rkey_h rkey)1270     void add_rkey(ucp_rkey_h rkey) {
1271         ASSERT_NE((ucp_rkey_h)NULL, rkey);
1272         m_rkeys.push_back(ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy));
1273     }
1274 
fill_send_data()1275     void fill_send_data() {
1276         m_send_data[0] = ucs_generate_uuid(0);
1277     }
1278 
flush_cb(void * req,ucs_status_t status)1279     static void flush_cb(void *req, ucs_status_t status) {
1280         request_t *request = (request_t *)req;
1281 
1282         ASSERT_UCS_OK(status);
1283         request->test->rkeys_cleanup();
1284         request->test->memhs_cleanup();
1285     }
1286 };
1287 
UCS_TEST_P(test_ucp_wireup_amo,relese_key_after_flush)1288 UCS_TEST_P(test_ucp_wireup_amo, relese_key_after_flush) {
1289     fill_send_data();
1290     clear_recv_data();
1291 
1292     sender().connect(&receiver(), get_ep_params());
1293 
1294     ucp_rkey_h rkey = get_rkey(sender());
1295     add_rkey(rkey);
1296 
1297     ucs_status_t status = ucp_atomic_post(sender().ep(), UCP_ATOMIC_POST_OP_ADD,
1298                                           m_send_data[0], sizeof(elem_type),
1299                                           (uint64_t)&m_recv_data[0], rkey);
1300     ASSERT_UCS_OK(status);
1301     request_t *req = (request_t *)ucp_ep_flush_nb(sender().ep(), 0, flush_cb);
1302     if (UCS_PTR_IS_PTR(req)) {
1303         req->test = this;
1304         wait(req);
1305     } else {
1306         ASSERT_UCS_OK(UCS_PTR_STATUS(req));
1307     }
1308 }
1309 
1310 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_amo)
1311 
UCS_TEST_P(test_ucp_wireup_fallback_amo,different_amo_types)1312 UCS_TEST_P(test_ucp_wireup_fallback_amo, different_amo_types) {
1313     std::vector<std::string> tls;
1314 
1315     /* the 1st peer support RC only (device atomics) */
1316     tls.push_back("rc");
1317     /* the 2nd peer support RC and SHM (device and CPU atomics) */
1318     tls.push_back("rc,shm");
1319 
1320     size_t min_max_num_eps = test_wireup_fallback_amo(tls, 1, 1);
1321     test_wireup_fallback_amo(tls, min_max_num_eps + 1, 0);
1322 }
1323 
1324 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback_amo,
1325                               shm_rc, "shm,rc_x,rc_v")
1326 
1327 /* NOTE: this fixture is NOT inherited from test_ucp_wireup, because we want to
1328  * create our own entities.
1329  */
1330 class test_ucp_wireup_asymmetric : public ucp_test {
1331 protected:
init()1332     virtual void init() {
1333         static const char *ibdev_sysfs_dir = "/sys/class/infiniband";
1334 
1335         DIR *dir = opendir(ibdev_sysfs_dir);
1336         if (dir == NULL) {
1337             UCS_TEST_SKIP_R(std::string(ibdev_sysfs_dir) + " not found");
1338         }
1339 
1340         for (;;) {
1341             struct dirent *entry = readdir(dir);
1342             if (entry == NULL) {
1343                 break;
1344             }
1345 
1346             if (entry->d_name[0] == '.') {
1347                 continue;
1348             }
1349 
1350             m_ib_devices.push_back(entry->d_name);
1351         }
1352 
1353         closedir(dir);
1354     }
1355 
tag_sendrecv(size_t size)1356     void tag_sendrecv(size_t size) {
1357         std::string send_data(size, 's');
1358         std::string recv_data(size, 'x');
1359 
1360         ucs_status_ptr_t sreq = ucp_tag_send_nb(
1361                         sender().ep(0), &send_data[0], size,
1362                         ucp_dt_make_contig(1), 1,
1363                         (ucp_send_callback_t)ucs_empty_function);
1364         ucs_status_ptr_t rreq = ucp_tag_recv_nb(
1365                         receiver().worker(), &recv_data[0], size,
1366                         ucp_dt_make_contig(1), 1, 1,
1367                         (ucp_tag_recv_callback_t)ucs_empty_function);
1368         wait(sreq);
1369         wait(rreq);
1370 
1371         EXPECT_EQ(send_data, recv_data);
1372     }
1373 
1374     /* Generate a pci_bw configuration string for IB devices, which assigns
1375      * the speed ai+b for device i.
1376      */
pci_bw_config(int a,int b)1377     std::string pci_bw_config(int a, int b) {
1378         std::string config_str;
1379         for (size_t i = 0; i < m_ib_devices.size(); ++i) {
1380             config_str += m_ib_devices[i] + ":" +
1381                             ucs::to_string((a * i) + b) + "Gbps";
1382             if (i != (m_ib_devices.size() - 1)) {
1383                 config_str += ",";
1384             }
1385         }
1386         return config_str;
1387     }
1388 
1389     std::vector<std::string> m_ib_devices;
1390 
1391 public:
get_ctx_params()1392     static ucp_params_t get_ctx_params() {
1393         ucp_params_t params = ucp_test::get_ctx_params();
1394         params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
1395         params.features     = UCP_FEATURE_TAG;
1396         return params;
1397     }
1398 };
1399 
1400 /*
1401  * Force asymmetric configuration by different PCI_BW settings
1402  */
UCS_TEST_SKIP_COND_P(test_ucp_wireup_asymmetric,connect,is_self ())1403 UCS_TEST_SKIP_COND_P(test_ucp_wireup_asymmetric, connect, is_self()) {
1404 
1405     /* Enable cross-dev connection */
1406     /* coverity[tainted_string_argument] */
1407     ucs::scoped_setenv path_mtu_env("UCX_RC_PATH_MTU", "1024");
1408 
1409     {
1410         std::string config_str = pci_bw_config(20, 20);
1411         UCS_TEST_MESSAGE << "creating sender: " << config_str;
1412         /* coverity[tainted_string_argument] */
1413         ucs::scoped_setenv pci_bw_env("UCX_IB_PCI_BW", config_str.c_str());
1414         create_entity();
1415     }
1416 
1417     {
1418         std::string config_str = pci_bw_config(-20, m_ib_devices.size() * 20);
1419         UCS_TEST_MESSAGE << "creating receiver: " << config_str;
1420         /* coverity[tainted_string_argument] */
1421         ucs::scoped_setenv pci_bw_env("UCX_IB_PCI_BW", config_str.c_str());
1422         create_entity();
1423     }
1424 
1425     sender().connect(&receiver(), get_ep_params());
1426     receiver().connect(&sender(), get_ep_params());
1427 
1428     ucp_ep_print_info(sender().ep(), stdout);
1429     ucp_ep_print_info(receiver().ep(), stdout);
1430 
1431     tag_sendrecv(1);
1432     tag_sendrecv(100000);
1433     tag_sendrecv(1000000);
1434 }
1435 
1436 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, rcv, "rc_v")
1437 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, rcx, "rc_x")
1438 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, ib, "ib")
1439