1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "ucp_test.h"
8 #include "common/test.h"
9 #include "ucp/ucp_test.h"
10
11 #include <algorithm>
12 #include <set>
13
14 extern "C" {
15 #include <ucp/wireup/address.h>
16 #include <ucp/core/ucp_ep.inl>
17 #include <ucs/sys/math.h>
18 }
19
20 class test_ucp_wireup : public ucp_test {
21 public:
22 static std::vector<ucp_test_param>
23 enum_test_params_features(const ucp_params_t& ctx_params,
24 const std::string& name,
25 const std::string& test_case_name,
26 const std::string& tls,
27 uint64_t features, bool test_all = 0);
28
29 protected:
30 enum {
31 TEST_RMA = UCS_BIT(0),
32 TEST_TAG = UCS_BIT(1),
33 TEST_STREAM = UCS_BIT(2),
34 UNIFIED_MODE = UCS_BIT(3),
35 TEST_AMO = UCS_BIT(4)
36 };
37
38 typedef uint64_t elem_type;
39 typedef std::vector<elem_type> vec_type;
40
41 static const size_t BUFFER_LENGTH = 16384;
42 static const ucp_datatype_t DT_U64 = ucp_dt_make_contig(sizeof(elem_type));
43 static const uint64_t TAG = 0xdeadbeef;
44 static const elem_type SEND_DATA = 0xdeadbeef12121212ull;
45
46 virtual void init();
47 virtual void cleanup();
48
49 void send_nb(ucp_ep_h ep, size_t length, int repeat, std::vector<void*>& reqs,
50 uint64_t send_data = SEND_DATA);
51
52 void send_b(ucp_ep_h ep, size_t length, int repeat,
53 uint64_t send_data = SEND_DATA);
54
55 void recv_b(ucp_worker_h worker, ucp_ep_h ep, size_t length, int repeat,
56 uint64_t recv_data = SEND_DATA);
57
58 void send_recv(ucp_ep_h send_ep, ucp_worker_h recv_worker, ucp_ep_h recv_ep,
59 size_t vecsize, int repeat);
60
61 void waitall(std::vector<void*> reqs);
62
63 void disconnect(ucp_ep_h ep);
64
65 void disconnect(ucp_test::entity &e);
66
67 static void send_completion(void *request, ucs_status_t status);
68
69 static void tag_recv_completion(void *request, ucs_status_t status,
70 ucp_tag_recv_info_t *info);
71
72 void rkeys_cleanup();
73
74 void memhs_cleanup();
75
76 void clear_recv_data();
77
78 void fill_send_data();
79
80 ucp_rkey_h get_rkey(ucp_ep_h ep, ucp_mem_h memh);
81
82 protected:
83 vec_type m_send_data;
84 vec_type m_recv_data;
85 ucs::handle<ucp_mem_h, ucp_context_h> m_memh_sender;
86 ucs::handle<ucp_mem_h, ucp_context_h> m_memh_receiver;
87 std::vector< ucs::handle<ucp_rkey_h> > m_rkeys;
88
89 private:
90 static void stream_recv_completion(void *request, ucs_status_t status,
91 size_t length);
92
93 static void unmap_memh(ucp_mem_h memh, ucp_context_h context);
94 };
95
96 std::vector<ucp_test_param>
enum_test_params_features(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls,uint64_t features,bool test_all)97 test_ucp_wireup::enum_test_params_features(const ucp_params_t& ctx_params,
98 const std::string& name,
99 const std::string& test_case_name,
100 const std::string& tls,
101 uint64_t features, bool test_all)
102 {
103 std::vector<ucp_test_param> result;
104 ucp_params_t tmp_ctx_params = ctx_params;
105
106 if (features & UCP_FEATURE_RMA) {
107 tmp_ctx_params.features = UCP_FEATURE_RMA;
108 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/rma",
109 tls, TEST_RMA, result);
110
111 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/rma",
112 tls, TEST_RMA | UNIFIED_MODE, result);
113 }
114
115 if (features & UCP_FEATURE_TAG) {
116 tmp_ctx_params.features = UCP_FEATURE_TAG;
117 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/tag",
118 tls, TEST_TAG, result);
119
120 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/tag",
121 tls, TEST_TAG | UNIFIED_MODE, result);
122 }
123
124 if (features & UCP_FEATURE_STREAM) {
125 tmp_ctx_params.features = UCP_FEATURE_STREAM;
126 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/stream",
127 tls, TEST_STREAM, result);
128
129 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/stream",
130 tls, TEST_STREAM | UNIFIED_MODE, result);
131 }
132
133 if (features & (UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64)) {
134 tmp_ctx_params.features = (UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64);
135 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/amo",
136 tls, TEST_AMO, result);
137 }
138
139 if (test_all) {
140 uint64_t all_flags = (TEST_TAG | TEST_RMA | TEST_STREAM);
141 tmp_ctx_params.features = features;
142 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/all",
143 tls, all_flags, result);
144
145 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/all",
146 tls, all_flags | UNIFIED_MODE, result);
147 }
148
149 return result;
150 }
151
unmap_memh(ucp_mem_h memh,ucp_context_h context)152 void test_ucp_wireup::unmap_memh(ucp_mem_h memh, ucp_context_h context)
153 {
154 ucs_status_t status = ucp_mem_unmap(context, memh);
155 if (status != UCS_OK) {
156 ucs_warn("failed to unmap memory: %s", ucs_status_string(status));
157 }
158 }
159
init()160 void test_ucp_wireup::init()
161 {
162 if (GetParam().variant & UNIFIED_MODE) {
163 modify_config("UNIFIED_MODE", "y");
164 }
165
166 ucp_test::init();
167
168 m_send_data.resize(BUFFER_LENGTH, 0);
169 m_recv_data.resize(BUFFER_LENGTH, 0);
170
171 if (GetParam().variant & (TEST_RMA | TEST_AMO)) {
172 ucs_status_t status;
173 ucp_mem_map_params_t params;
174 ucp_mem_h memh;
175
176 params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
177 UCP_MEM_MAP_PARAM_FIELD_LENGTH |
178 UCP_MEM_MAP_PARAM_FIELD_FLAGS;
179 params.address = &m_recv_data[0];
180 params.length = m_recv_data.size() * sizeof(m_recv_data[0]);
181 params.flags = 0;
182
183 status = ucp_mem_map(sender().ucph(), ¶ms, &memh);
184 ASSERT_UCS_OK(status);
185 m_memh_sender.reset(memh, unmap_memh, sender().ucph());
186
187 status = ucp_mem_map(receiver().ucph(), ¶ms, &memh);
188 ASSERT_UCS_OK(status);
189 m_memh_receiver.reset(memh, unmap_memh, receiver().ucph());
190 }
191 }
192
get_rkey(ucp_ep_h ep,ucp_mem_h memh)193 ucp_rkey_h test_ucp_wireup::get_rkey(ucp_ep_h ep, ucp_mem_h memh)
194 {
195 void *rkey_buffer;
196 size_t rkey_size;
197 ucs_status_t status;
198 ucp_rkey_h rkey;
199
200 if (memh == m_memh_receiver) {
201 status = ucp_rkey_pack(receiver().ucph(), memh, &rkey_buffer, &rkey_size);
202 } else if (memh == m_memh_sender) {
203 status = ucp_rkey_pack(sender().ucph(), memh, &rkey_buffer, &rkey_size);
204 } else {
205 status = UCS_ERR_INVALID_PARAM;
206 }
207 ASSERT_UCS_OK(status);
208
209 status = ucp_ep_rkey_unpack(ep, rkey_buffer, &rkey);
210 ASSERT_UCS_OK(status);
211
212 ucp_rkey_buffer_release(rkey_buffer);
213
214 return rkey;
215 }
216
rkeys_cleanup()217 void test_ucp_wireup::rkeys_cleanup() {
218 m_rkeys.clear();
219 }
220
memhs_cleanup()221 void test_ucp_wireup::memhs_cleanup() {
222 m_memh_sender.reset();
223 m_memh_receiver.reset();
224 }
225
cleanup()226 void test_ucp_wireup::cleanup() {
227 rkeys_cleanup();
228 memhs_cleanup();
229 ucp_test::cleanup();
230 }
231
clear_recv_data()232 void test_ucp_wireup::clear_recv_data() {
233 std::fill(m_recv_data.begin(), m_recv_data.end(), 0);
234 }
235
send_nb(ucp_ep_h ep,size_t length,int repeat,std::vector<void * > & reqs,uint64_t send_data)236 void test_ucp_wireup::send_nb(ucp_ep_h ep, size_t length, int repeat,
237 std::vector<void*>& reqs, uint64_t send_data)
238 {
239 if (GetParam().variant & TEST_TAG) {
240 std::fill(m_send_data.begin(), m_send_data.end(), send_data);
241 for (int i = 0; i < repeat; ++i) {
242 void *req = ucp_tag_send_nb(ep, &m_send_data[0], length,
243 DT_U64, TAG, send_completion);
244 if (UCS_PTR_IS_PTR(req)) {
245 reqs.push_back(req);
246 } else {
247 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
248 }
249 }
250 } else if (GetParam().variant & TEST_STREAM) {
251 std::fill(m_send_data.begin(), m_send_data.end(), send_data);
252 for (int i = 0; i < repeat; ++i) {
253 void *req = ucp_stream_send_nb(ep, &m_send_data[0], length, DT_U64,
254 send_completion, 0);
255 if (UCS_PTR_IS_PTR(req)) {
256 reqs.push_back(req);
257 } else {
258 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
259 }
260 }
261 } else if (GetParam().variant & TEST_RMA) {
262 clear_recv_data();
263
264 ucp_mem_h memh = (sender().ucph() == ep->worker->context) ?
265 m_memh_receiver : m_memh_sender;
266 ucp_rkey_h rkey = get_rkey(ep, memh);
267
268 m_rkeys.push_back(ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy));
269
270 for (int i = 0; i < repeat; ++i) {
271 std::fill(m_send_data.begin(), m_send_data.end(), send_data + i);
272 void *req = ucp_put_nb(ep, &m_send_data[0],
273 m_send_data.size() * sizeof(m_send_data[0]),
274 (uintptr_t)&m_recv_data[0], rkey,
275 send_completion);
276 if (UCS_PTR_IS_PTR(req)) {
277 reqs.push_back(req);
278 } else {
279 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
280 }
281 }
282 }
283 }
284
send_b(ucp_ep_h ep,size_t length,int repeat,uint64_t send_data)285 void test_ucp_wireup::send_b(ucp_ep_h ep, size_t length, int repeat,
286 uint64_t send_data)
287 {
288 std::vector<void*> reqs;
289 send_nb(ep, length, repeat, reqs, send_data);
290 waitall(reqs);
291 }
292
recv_b(ucp_worker_h worker,ucp_ep_h ep,size_t length,int repeat,uint64_t recv_data)293 void test_ucp_wireup::recv_b(ucp_worker_h worker, ucp_ep_h ep, size_t length,
294 int repeat, uint64_t recv_data)
295 {
296 if (GetParam().variant & (TEST_TAG | TEST_STREAM)) {
297 for (int i = 0; i < repeat; ++i) {
298 size_t recv_length;
299 void *req;
300
301 clear_recv_data();
302 if (GetParam().variant & TEST_TAG) {
303 req = ucp_tag_recv_nb(worker, &m_recv_data[0], length, DT_U64,
304 TAG, (ucp_tag_t)-1, tag_recv_completion);
305 } else if (GetParam().variant & TEST_STREAM) {
306 req = ucp_stream_recv_nb(ep, &m_recv_data[0], length, DT_U64,
307 stream_recv_completion, &recv_length,
308 UCP_STREAM_RECV_FLAG_WAITALL);
309 } else {
310 req = NULL;
311 }
312 if (UCS_PTR_IS_PTR(req)) {
313 wait(req);
314 } else {
315 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
316 }
317 EXPECT_EQ(recv_data, m_recv_data[0])
318 << "repeat " << i << "/" << repeat;
319 EXPECT_EQ(length,
320 (size_t)std::count(m_recv_data.begin(),
321 m_recv_data.begin() + length,
322 recv_data));
323 }
324 } else if (GetParam().variant & TEST_RMA) {
325 for (size_t i = 0; i < length; ++i) {
326 while (m_recv_data[i] != recv_data + repeat - 1) {
327 progress();
328 }
329 }
330 }
331 }
332
send_completion(void * request,ucs_status_t status)333 void test_ucp_wireup::send_completion(void *request, ucs_status_t status)
334 {
335 }
336
tag_recv_completion(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)337 void test_ucp_wireup::tag_recv_completion(void *request, ucs_status_t status,
338 ucp_tag_recv_info_t *info)
339 {
340 }
341
stream_recv_completion(void * request,ucs_status_t status,size_t length)342 void test_ucp_wireup::stream_recv_completion(void *request, ucs_status_t status,
343 size_t length)
344 {
345 }
346
send_recv(ucp_ep_h send_ep,ucp_worker_h recv_worker,ucp_ep_h recv_ep,size_t length,int repeat)347 void test_ucp_wireup::send_recv(ucp_ep_h send_ep, ucp_worker_h recv_worker,
348 ucp_ep_h recv_ep, size_t length, int repeat)
349 {
350 std::vector<void*> send_reqs;
351 static uint64_t next_send_data = 0;
352 uint64_t send_data = next_send_data++;
353
354 send_nb(send_ep, length, repeat, send_reqs, send_data);
355 recv_b (recv_worker, recv_ep, length, repeat, send_data);
356 waitall(send_reqs);
357 m_rkeys.clear();
358 }
359
disconnect(ucp_ep_h ep)360 void test_ucp_wireup::disconnect(ucp_ep_h ep) {
361 void *req = ucp_disconnect_nb(ep);
362 if (!UCS_PTR_IS_PTR(req)) {
363 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
364 }
365 wait(req);
366 }
367
disconnect(ucp_test::entity & e)368 void test_ucp_wireup::disconnect(ucp_test::entity &e) {
369 disconnect(e.revoke_ep());
370 }
371
waitall(std::vector<void * > reqs)372 void test_ucp_wireup::waitall(std::vector<void*> reqs)
373 {
374 while (!reqs.empty()) {
375 wait(reqs.back());
376 reqs.pop_back();
377 }
378 }
379
380 class test_ucp_wireup_1sided : public test_ucp_wireup {
381 public:
382 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)383 enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
384 const std::string& test_case_name, const std::string& tls)
385 {
386 return enum_test_params_features(ctx_params, name, test_case_name, tls,
387 UCP_FEATURE_RMA | UCP_FEATURE_TAG);
388 }
389
test_ucp_wireup_1sided()390 test_ucp_wireup_1sided() {
391 for (ucp_lane_index_t i = 0; i < UCP_MAX_LANES; ++i) {
392 m_lanes2remote[i] = i;
393 }
394 }
395
396 ucp_lane_index_t m_lanes2remote[UCP_MAX_LANES];
397 };
398
UCS_TEST_P(test_ucp_wireup_1sided,address)399 UCS_TEST_P(test_ucp_wireup_1sided, address) {
400 ucs_status_t status;
401 size_t size;
402 void *buffer;
403 std::set<uint8_t> packed_dev_priorities, unpacked_dev_priorities;
404 ucp_rsc_index_t tl;
405
406 status = ucp_address_pack(sender().worker(), NULL,
407 std::numeric_limits<uint64_t>::max(),
408 UCP_ADDRESS_PACK_FLAGS_ALL, m_lanes2remote, &size,
409 &buffer);
410 ASSERT_UCS_OK(status);
411 ASSERT_TRUE(buffer != NULL);
412 ASSERT_GT(size, 0ul);
413 EXPECT_LE(size, 2048ul); /* Expect a reasonable address size */
414
415 ucs_for_each_bit(tl, sender().worker()->context->tl_bitmap) {
416 if (sender().worker()->context->tl_rscs[tl].flags & UCP_TL_RSC_FLAG_SOCKADDR) {
417 continue;
418 }
419 packed_dev_priorities.insert(ucp_worker_iface_get_attr(sender().worker(), tl)->priority);
420 }
421
422 ucp_unpacked_address unpacked_address;
423
424 status = ucp_address_unpack(sender().worker(), buffer,
425 UCP_ADDRESS_PACK_FLAGS_ALL, &unpacked_address);
426 ASSERT_UCS_OK(status);
427
428 EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
429 #if ENABLE_DEBUG_DATA
430 EXPECT_EQ(std::string(ucp_worker_get_name(sender().worker())),
431 std::string(unpacked_address.name));
432 #endif
433 EXPECT_LE(unpacked_address.address_count,
434 static_cast<unsigned>(sender().ucph()->num_tls));
435
436 const ucp_address_entry_t *ae;
437 ucp_unpacked_address_for_each(ae, &unpacked_address) {
438 unpacked_dev_priorities.insert(ae->iface_attr.priority);
439 }
440
441 /* TODO test addresses */
442
443 ucs_free(unpacked_address.address_list);
444 ucs_free(buffer);
445 /* Make sure that the packed device priorities are equal to the unpacked
446 * device priorities */
447 ASSERT_TRUE(packed_dev_priorities == unpacked_dev_priorities);
448 }
449
450 UCS_TEST_P(test_ucp_wireup_1sided, ep_address, "IB_NUM_PATHS?=2") {
451 ucs_status_t status;
452 size_t size;
453 void *buffer;
454
455 sender().connect(&receiver(), get_ep_params());
456
457 status = ucp_address_pack(sender().worker(), sender().ep(),
458 std::numeric_limits<uint64_t>::max(),
459 UCP_ADDRESS_PACK_FLAGS_ALL, m_lanes2remote, &size,
460 &buffer);
461 ASSERT_UCS_OK(status);
462 ASSERT_TRUE(buffer != NULL);
463
464 ucp_unpacked_address unpacked_address;
465
466 status = ucp_address_unpack(sender().worker(), buffer,
467 UCP_ADDRESS_PACK_FLAGS_ALL, &unpacked_address);
468 ASSERT_UCS_OK(status);
469
470 EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
471 EXPECT_LE(unpacked_address.address_count,
472 static_cast<unsigned>(sender().ucph()->num_tls));
473
474 ucs_free(unpacked_address.address_list);
475 ucs_free(buffer);
476 }
477
UCS_TEST_P(test_ucp_wireup_1sided,empty_address)478 UCS_TEST_P(test_ucp_wireup_1sided, empty_address) {
479 ucs_status_t status;
480 size_t size;
481 void *buffer;
482
483 status = ucp_address_pack(sender().worker(), NULL, 0,
484 UCP_ADDRESS_PACK_FLAGS_ALL, m_lanes2remote, &size,
485 &buffer);
486 ASSERT_UCS_OK(status);
487 ASSERT_TRUE(buffer != NULL);
488 ASSERT_GT(size, 0ul);
489
490 ucp_unpacked_address unpacked_address;
491
492 status = ucp_address_unpack(sender().worker(), buffer,
493 UCP_ADDRESS_PACK_FLAGS_ALL, &unpacked_address);
494 ASSERT_UCS_OK(status);
495
496 EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
497 #if ENABLE_DEBUG_DATA
498 EXPECT_EQ(std::string(ucp_worker_get_name(sender().worker())),
499 std::string(unpacked_address.name));
500 #endif
501 EXPECT_EQ(0u, unpacked_address.address_count);
502
503 ucs_free(unpacked_address.address_list);
504 ucs_free(buffer);
505 }
506
UCS_TEST_P(test_ucp_wireup_1sided,one_sided_wireup)507 UCS_TEST_P(test_ucp_wireup_1sided, one_sided_wireup) {
508 sender().connect(&receiver(), get_ep_params());
509 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
510 flush_worker(sender());
511 }
512
513 UCS_TEST_P(test_ucp_wireup_1sided, one_sided_wireup_rndv, "RNDV_THRESH=1") {
514 sender().connect(&receiver(), get_ep_params());
515 send_recv(sender().ep(), receiver().worker(), receiver().ep(), BUFFER_LENGTH, 1);
516 if (is_loopback() && (GetParam().variant & TEST_TAG)) {
517 /* expect the endpoint to be connected to itself */
518 ucp_ep_h ep = sender().ep();
519 EXPECT_EQ((uintptr_t)ep, ucp_ep_dest_ep_ptr(ep));
520 }
521 flush_worker(sender());
522 }
523
UCS_TEST_P(test_ucp_wireup_1sided,multi_wireup)524 UCS_TEST_P(test_ucp_wireup_1sided, multi_wireup) {
525 skip_loopback();
526
527 const size_t count = 10;
528 while (entities().size() < count) {
529 create_entity();
530 }
531
532 /* connect from sender() to all the rest */
533 for (size_t i = 0; i < count; ++i) {
534 sender().connect(&entities().at(i), get_ep_params(), i);
535 }
536 }
537
UCS_TEST_P(test_ucp_wireup_1sided,stress_connect)538 UCS_TEST_P(test_ucp_wireup_1sided, stress_connect) {
539 for (int i = 0; i < 30; ++i) {
540 sender().connect(&receiver(), get_ep_params());
541 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1,
542 10000 / (ucs::test_time_multiplier() *
543 ucs::test_time_multiplier()));
544 if (!is_loopback()) {
545 receiver().connect(&sender(), get_ep_params());
546 }
547
548 disconnect(sender());
549 if (!is_loopback()) {
550 disconnect(receiver());
551 }
552 }
553 }
554
UCS_TEST_P(test_ucp_wireup_1sided,stress_connect2)555 UCS_TEST_P(test_ucp_wireup_1sided, stress_connect2) {
556 int max_count = (int)ucs_max(10,
557 (1000.0 / (ucs::test_time_multiplier() *
558 ucs::test_time_multiplier())));
559 int count = ucs_min(max_count, max_connections() / 2);
560
561 for (int i = 0; i < count; ++i) {
562 sender().connect(&receiver(), get_ep_params());
563 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
564 if (!is_loopback()) {
565 receiver().connect(&sender(), get_ep_params());
566 }
567
568 disconnect(sender());
569 if (!is_loopback()) {
570 disconnect(receiver());
571 }
572 }
573 }
574
UCS_TEST_P(test_ucp_wireup_1sided,disconnect_nonexistent)575 UCS_TEST_P(test_ucp_wireup_1sided, disconnect_nonexistent) {
576 skip_loopback();
577 sender().connect(&receiver(), get_ep_params());
578 disconnect(sender());
579 receiver().destroy_worker();
580 sender().destroy_worker();
581 }
582
UCS_TEST_P(test_ucp_wireup_1sided,disconnect_reconnect)583 UCS_TEST_P(test_ucp_wireup_1sided, disconnect_reconnect) {
584 sender().connect(&receiver(), get_ep_params());
585 send_b(sender().ep(), 1000, 1);
586 disconnect(sender());
587 recv_b(receiver().worker(), receiver().ep(), 1000, 1);
588
589 sender().connect(&receiver(), get_ep_params());
590 send_b(sender().ep(), 1000, 1);
591 disconnect(sender());
592 recv_b(receiver().worker(), receiver().ep(), 1000, 1);
593 }
594
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_onesided)595 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided) {
596 sender().connect(&receiver(), get_ep_params());
597 send_b(sender().ep(), 1000, 100);
598 disconnect(sender());
599 recv_b(receiver().worker(), receiver().ep(), 1000, 100);
600 }
601
602 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided_nozcopy, "ZCOPY_THRESH=-1") {
603 sender().connect(&receiver(), get_ep_params());
604 send_b(sender().ep(), 1000, 100);
605 disconnect(sender());
606 recv_b(receiver().worker(), receiver().ep(), 1000, 100);
607 }
608
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_onesided_wait)609 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided_wait) {
610 sender().connect(&receiver(), get_ep_params());
611 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 8, 1);
612 send_b(sender().ep(), 1000, 200);
613 disconnect(sender());
614 recv_b(receiver().worker(), receiver().ep(), 1000, 200);
615 }
616
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_reply1)617 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_reply1) {
618 sender().connect(&receiver(), get_ep_params());
619 if (!is_loopback()) {
620 receiver().connect(&sender(), get_ep_params());
621 }
622
623 send_b(sender().ep(), 8, 1);
624 if (!is_loopback()) {
625 disconnect(sender());
626 }
627
628 recv_b(receiver().worker(), receiver().ep(), 8, 1);
629 send_b(receiver().ep(), 8, 1);
630 disconnect(receiver());
631 recv_b(sender().worker(), sender().ep(), 8, 1);
632 }
633
UCS_TEST_P(test_ucp_wireup_1sided,send_disconnect_reply2)634 UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_reply2) {
635 sender().connect(&receiver(), get_ep_params());
636
637 send_b(sender().ep(), 8, 1);
638 if (!is_loopback()) {
639 disconnect(sender());
640 }
641 recv_b(receiver().worker(), receiver().ep(), 8, 1);
642
643 if (!is_loopback()) {
644 receiver().connect(&sender(), get_ep_params());
645 }
646
647 send_b(receiver().ep(), 8, 1);
648 disconnect(receiver());
649 recv_b(sender().worker(), receiver().ep(), 8, 1);
650 }
651
UCS_TEST_P(test_ucp_wireup_1sided,disconnect_nb_onesided)652 UCS_TEST_P(test_ucp_wireup_1sided, disconnect_nb_onesided) {
653 sender().connect(&receiver(), get_ep_params());
654
655 std::vector<void*> sreqs;
656 send_nb(sender().ep(), 1000, 1000, sreqs);
657
658 void *req = sender().disconnect_nb();
659 ucs_time_t deadline = ucs::get_deadline();
660 while (!is_request_completed(req) && (ucs_get_time() < deadline)) {
661 progress();
662 }
663
664 sender().close_ep_req_free(req);
665
666 recv_b(receiver().worker(), receiver().ep(), 1000, 1000);
667 waitall(sreqs);
668 }
669
UCS_TEST_P(test_ucp_wireup_1sided,multi_ep_1sided)670 UCS_TEST_P(test_ucp_wireup_1sided, multi_ep_1sided) {
671 const unsigned count = 10;
672
673 for (unsigned i = 0; i < count; ++i) {
674 sender().connect(&receiver(), get_ep_params(), i);
675 }
676
677 for (unsigned i = 0; i < count; ++i) {
678 send_recv(sender().ep(0, i), receiver().worker(), receiver().ep(), 8, 1);
679 }
680 }
681
682 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_1sided)
683
684 class test_ucp_wireup_2sided : public test_ucp_wireup {
685 public:
686 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)687 enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
688 const std::string& test_case_name, const std::string& tls)
689 {
690 return enum_test_params_features(ctx_params, name, test_case_name, tls,
691 UCP_FEATURE_RMA | UCP_FEATURE_TAG |
692 UCP_FEATURE_STREAM);
693 }
694
695 protected:
696 void test_connect_loopback(bool delay_before_connect, bool enable_loopback);
697 };
698
UCS_TEST_P(test_ucp_wireup_2sided,two_sided_wireup)699 UCS_TEST_P(test_ucp_wireup_2sided, two_sided_wireup) {
700 sender().connect(&receiver(), get_ep_params());
701 if (!is_loopback()) {
702 receiver().connect(&sender(), get_ep_params());
703 }
704
705 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
706 flush_worker(sender());
707 send_recv(receiver().ep(), sender().worker(), sender().ep(), 1, 1);
708 flush_worker(receiver());
709 }
710
test_connect_loopback(bool delay_before_connect,bool enable_loopback)711 void test_ucp_wireup_2sided::test_connect_loopback(bool delay_before_connect,
712 bool enable_loopback) {
713 ucp_ep_params_t params = test_ucp_wireup::get_ep_params();
714 if (!enable_loopback) {
715 params.field_mask |= UCP_EP_PARAM_FIELD_FLAGS;
716 params.flags |= UCP_EP_PARAMS_FLAGS_NO_LOOPBACK;
717 }
718
719 for (int i = 0; i < 5; ++i) {
720 int base_index = i * 2;
721 sender().connect(&sender(), params, base_index);
722 ucp_ep_h ep1 = sender().ep(0, base_index);
723
724 if (delay_before_connect) {
725 /* let one side create ep */
726 short_progress_loop(0);
727 }
728
729 sender().connect(&sender(), params, base_index + 1);
730 ucp_ep_h ep2 = sender().ep(0, base_index + 1);
731
732 EXPECT_NE(ep1, ep2);
733
734 if (GetParam().variant & TEST_STREAM) {
735 uint64_t data1 = (base_index * 10) + 1;
736 uint64_t data2 = (base_index * 10) + 2;
737
738 send_b(ep1, 1, 1, data1);
739 send_b(ep2, 1, 1, data2);
740
741 if (enable_loopback) {
742 /* self-send - each ep receives what was sent on it */
743 recv_b(sender().worker(), ep1, 1, 1, data1);
744 recv_b(sender().worker(), ep2, 1, 1, data2);
745 } else {
746 /* cross-send - each ep receives what was sent on the other ep */
747 recv_b(sender().worker(), ep1, 1, 1, data2);
748 recv_b(sender().worker(), ep2, 1, 1, data1);
749 }
750 }
751 }
752 flush_worker(sender());
753 }
754
UCS_TEST_P(test_ucp_wireup_2sided,loopback)755 UCS_TEST_P(test_ucp_wireup_2sided, loopback) {
756 test_connect_loopback(false, true);
757 }
758
UCS_TEST_P(test_ucp_wireup_2sided,loopback_with_delay)759 UCS_TEST_P(test_ucp_wireup_2sided, loopback_with_delay) {
760 test_connect_loopback(true, true);
761 }
762
UCS_TEST_P(test_ucp_wireup_2sided,no_loopback)763 UCS_TEST_P(test_ucp_wireup_2sided, no_loopback) {
764 test_connect_loopback(false, false);
765 }
766
UCS_TEST_P(test_ucp_wireup_2sided,no_loopback_with_delay)767 UCS_TEST_P(test_ucp_wireup_2sided, no_loopback_with_delay) {
768 test_connect_loopback(true, false);
769 }
770
771 UCS_TEST_SKIP_COND_P(test_ucp_wireup_2sided, async_connect,
772 !(GetParam().ctx_params.features & UCP_FEATURE_TAG)) {
773 sender().connect(&receiver(), get_ep_params());
774 ucp_ep_h send_ep = sender().ep();
775 std::vector<void *> reqs;
776
777 reqs.push_back(ucp_tag_send_nb(send_ep, NULL, 0, DT_U64, 1, send_completion));
778 EXPECT_FALSE(UCS_PTR_IS_ERR(reqs.back()));
779
780 ucs_time_t deadline = ucs::get_deadline();
781 /* waiting of async reply on wiriup without calling progress on receiver */
782 while(!(send_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED) &&
783 (ucs_get_time() < deadline)) {
784 ucp_worker_progress(sender().worker());
785 ucp_worker_progress(receiver().worker());
786 }
787
788 reqs.push_back(ucp_tag_recv_nb(receiver().worker(), NULL, 0, DT_U64, 1,
789 (ucp_tag_t)-1, tag_recv_completion));
790 EXPECT_FALSE(UCS_PTR_IS_ERR(reqs.back()));
791 waitall(reqs);
792 }
793
UCS_TEST_P(test_ucp_wireup_2sided,connect_disconnect)794 UCS_TEST_P(test_ucp_wireup_2sided, connect_disconnect) {
795 sender().connect(&receiver(), get_ep_params());
796 if (!is_loopback()) {
797 receiver().connect(&sender(), get_ep_params());
798 }
799 disconnect(sender());
800 if (!is_loopback()) {
801 disconnect(receiver());
802 }
803 }
804
UCS_TEST_P(test_ucp_wireup_2sided,multi_ep_2sided)805 UCS_TEST_P(test_ucp_wireup_2sided, multi_ep_2sided) {
806 const unsigned count = 10;
807
808 for (unsigned j = 0; j < 4; ++j) {
809
810 unsigned offset = j * count;
811
812 for (unsigned i = 0; i < count; ++i) {
813 unsigned ep_idx = offset + i;
814 sender().connect(&receiver(), get_ep_params(), ep_idx);
815 if (!is_loopback()) {
816 receiver().connect(&sender(), get_ep_params(), ep_idx);
817 }
818 UCS_TEST_MESSAGE << "iteration " << j << " pair " << i << ": " <<
819 sender().ep(0, ep_idx) << " <--> " << receiver().ep(0, ep_idx);
820 }
821
822 for (unsigned i = 0; i < count; ++i) {
823 unsigned ep_idx = offset + i;
824 send_recv(sender().ep(0, ep_idx), receiver().worker(),
825 receiver().ep(0, ep_idx), 8, 1);
826 send_recv(receiver().ep(0, ep_idx), sender().worker(),
827 sender().ep(0, ep_idx), 8, 1);
828 }
829
830 short_progress_loop(0);
831 }
832 }
833
834 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_2sided)
835
836 class test_ucp_wireup_errh_peer : public test_ucp_wireup_1sided
837 {
838 public:
get_ep_params()839 virtual ucp_ep_params_t get_ep_params() {
840 ucp_ep_params_t params = test_ucp_wireup::get_ep_params();
841 params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
842 UCP_EP_PARAM_FIELD_ERR_HANDLER;
843 params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
844 params.err_handler.cb = err_cb;
845 params.err_handler.arg = NULL;
846 return params;
847 }
848
init()849 virtual void init() {
850 test_ucp_wireup::init();
851 skip_loopback();
852 }
853
err_cb(void *,ucp_ep_h,ucs_status_t)854 static void err_cb(void *, ucp_ep_h, ucs_status_t) {}
855 };
856
UCS_TEST_P(test_ucp_wireup_errh_peer,msg_after_ep_create)857 UCS_TEST_P(test_ucp_wireup_errh_peer, msg_after_ep_create) {
858 receiver().connect(&sender(), get_ep_params());
859
860 sender().connect(&receiver(), get_ep_params());
861 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
862 flush_worker(sender());
863 }
864
UCS_TEST_P(test_ucp_wireup_errh_peer,msg_before_ep_create)865 UCS_TEST_P(test_ucp_wireup_errh_peer, msg_before_ep_create) {
866
867 sender().connect(&receiver(), get_ep_params());
868 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
869 flush_worker(sender());
870
871 receiver().connect(&sender(), get_ep_params());
872
873 send_recv(receiver().ep(), sender().worker(), receiver().ep(), 1, 1);
874 flush_worker(receiver());
875 }
876
877 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_errh_peer)
878
879 class test_ucp_wireup_fallback : public test_ucp_wireup {
880 public:
test_ucp_wireup_fallback()881 test_ucp_wireup_fallback() {
882 m_num_lanes = 0;
883 }
884
885 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)886 enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
887 const std::string& test_case_name, const std::string& tls)
888 {
889 return enum_test_params_features(ctx_params, name, test_case_name, tls,
890 UCP_FEATURE_TAG | UCP_FEATURE_RMA |
891 UCP_FEATURE_STREAM, 1);
892 }
893
init()894 void init() {
895 /* do nothing */
896 }
897
cleanup()898 void cleanup() {
899 /* do nothing */
900 }
901
check_scalable_tls(const ucp_worker_h worker,size_t est_num_eps)902 bool check_scalable_tls(const ucp_worker_h worker, size_t est_num_eps) {
903 ucp_rsc_index_t rsc_index;
904
905 ucs_for_each_bit(rsc_index, worker->context->tl_bitmap) {
906 ucp_md_index_t md_index = worker->context->tl_rscs[rsc_index].md_index;
907 const uct_md_attr_t *md_attr = &worker->context->tl_mds[md_index].attr;
908
909 if ((worker->context->tl_rscs[rsc_index].flags & UCP_TL_RSC_FLAG_AUX) ||
910 (md_attr->cap.flags & UCT_MD_FLAG_SOCKADDR) ||
911 (worker->context->tl_rscs[rsc_index].tl_rsc.dev_type == UCT_DEVICE_TYPE_ACC)) {
912 // Skip TLs for wireup and CM and acceleration TLs
913 continue;
914 }
915
916 if (ucp_worker_iface_get_attr(worker, rsc_index)->max_num_eps >= est_num_eps) {
917 EXPECT_TRUE((worker->scalable_tl_bitmap & UCS_BIT(rsc_index)) != 0);
918 return true;
919 } else {
920 EXPECT_TRUE((worker->scalable_tl_bitmap & UCS_BIT(rsc_index)) == 0);
921 }
922 }
923
924 return false;
925 }
926
test_est_num_eps_fallback(size_t est_num_eps,unsigned long & min_max_num_eps)927 bool test_est_num_eps_fallback(size_t est_num_eps,
928 unsigned long &min_max_num_eps) {
929 size_t num_lanes = 0;
930 bool res = true;
931 bool has_only_unscalable;
932
933 min_max_num_eps = UCS_ULUNITS_INF;
934
935 UCS_TEST_MESSAGE << "Testing " << est_num_eps << " number of EPs";
936 modify_config("NUM_EPS", ucs::to_string(est_num_eps).c_str());
937 test_ucp_wireup::init();
938
939 sender().connect(&receiver(), get_ep_params());
940 if (!is_loopback()) {
941 receiver().connect(&sender(), get_ep_params());
942 }
943 send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
944 flush_worker(sender());
945
946 has_only_unscalable = !check_scalable_tls(sender().worker(),
947 est_num_eps);
948
949 for (ucp_lane_index_t lane = 0;
950 lane < ucp_ep_num_lanes(sender().ep()); lane++) {
951 uct_ep_h uct_ep = sender().ep()->uct_eps[lane];
952 if (uct_ep == NULL) {
953 continue;
954 }
955
956 uct_iface_attr_t iface_attr;
957 ucs_status_t status = uct_iface_query(uct_ep->iface, &iface_attr);
958 ASSERT_UCS_OK(status);
959
960 num_lanes++;
961
962 if (!has_only_unscalable && (iface_attr.max_num_eps < est_num_eps)) {
963 res = false;
964 goto out;
965 }
966
967 if (iface_attr.max_num_eps < min_max_num_eps) {
968 min_max_num_eps = iface_attr.max_num_eps;
969 }
970 }
971
972 out:
973 test_ucp_wireup::cleanup();
974
975 if (est_num_eps == 1) {
976 m_num_lanes = num_lanes;
977 } else if (has_only_unscalable) {
978 /* If has only unscalable transports, check that the number of
979 * lanes is the same as for the case when "est_num_eps == 1" */
980 res = (num_lanes == m_num_lanes);
981 }
982
983 return res;
984 }
985
986 private:
987
988 /* The number of lanes activated for the case when "est_num_eps == 1" */
989 size_t m_num_lanes;
990 };
991
UCS_TEST_P(test_ucp_wireup_fallback,est_num_eps_fallback)992 UCS_TEST_P(test_ucp_wireup_fallback, est_num_eps_fallback) {
993 unsigned long test_min_max_eps, min_max_eps;
994
995 test_est_num_eps_fallback(1, test_min_max_eps);
996
997 size_t prev_min_max_eps = 0;
998 while ((test_min_max_eps != UCS_ULUNITS_INF) &&
999 /* number of EPs was changed between iterations */
1000 (test_min_max_eps != prev_min_max_eps)) {
1001 if (test_min_max_eps > 1) {
1002 EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps - 1,
1003 min_max_eps));
1004 }
1005
1006 EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps,
1007 min_max_eps));
1008
1009 EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps + 1,
1010 min_max_eps));
1011 prev_min_max_eps = test_min_max_eps;
1012 test_min_max_eps = min_max_eps;
1013 }
1014 }
1015
1016 /* Test fallback from RC to UD, since RC isn't scalable enough
1017 * as its iface max_num_eps attribute = 256 by default */
1018 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1019 rc_ud, "rc_x,rc_v,ud_x,ud_v")
1020 /* Test fallback selection of UD only TLs, since TCP shouldn't
1021 * be used for any lanes */
1022 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1023 ud_tcp, "ud_x,ud_v,tcp")
1024 /* Test two scalable enough transports */
1025 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1026 dc_ud, "dc_x,ud_x,ud_v")
1027 /* Test unsacalable transports only */
1028 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1029 rc, "rc_x,rc_v")
1030 /* Test all available IB transports */
1031 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1032 ib, "ib")
1033 /* Test on TCP only */
1034 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
1035 tcp, "tcp")
1036
1037 class test_ucp_wireup_unified : public test_ucp_wireup {
1038 public:
1039 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)1040 enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
1041 const std::string& test_case_name, const std::string& tls)
1042 {
1043 std::vector<ucp_test_param> result;
1044 ucp_params_t tmp_ctx_params = ctx_params;
1045
1046 tmp_ctx_params.features = UCP_FEATURE_TAG;
1047
1048 generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/uni",
1049 tls, TEST_TAG | UNIFIED_MODE, result);
1050 return result;
1051 }
1052
context_has_tls(ucp_context_h ctx,const std::string & tl,ucp_rsc_index_t md_idx)1053 bool context_has_tls(ucp_context_h ctx, const std::string& tl,
1054 ucp_rsc_index_t md_idx)
1055 {
1056 for (ucp_rsc_index_t idx = 0; idx < ctx->num_tls; ++idx) {
1057 if (ctx->tl_rscs[idx].md_index != md_idx) {
1058 continue;
1059 }
1060
1061 if (!strcmp(ctx->tl_rscs[idx].tl_rsc.tl_name, tl.c_str())) {
1062 return true;
1063 }
1064 }
1065
1066 return false;
1067 }
1068
worker_has_tls(ucp_worker_h worker,const std::string & tl,ucp_rsc_index_t md_idx)1069 bool worker_has_tls(ucp_worker_h worker, const std::string& tl,
1070 ucp_rsc_index_t md_idx)
1071 {
1072 ucp_context_h ctx = worker->context;
1073
1074 for (unsigned i = 0; i < worker->num_ifaces; ++i) {
1075 ucp_worker_iface_t *wiface = worker->ifaces[i];
1076 ucp_rsc_index_t md_idx_it = ctx->tl_rscs[wiface->rsc_index].md_index;
1077
1078 if (md_idx_it != md_idx) {
1079 continue;
1080 }
1081
1082 char* name = ctx->tl_rscs[wiface->rsc_index].tl_rsc.tl_name;
1083 if (!strcmp(name, tl.c_str())) {
1084 return true;
1085 }
1086 }
1087 return false;
1088 }
1089
check_unified_ifaces(entity * e,const std::string & better_tl,const std::string & tl)1090 void check_unified_ifaces(entity *e,
1091 const std::string& better_tl,
1092 const std::string& tl)
1093 {
1094 ucp_context_h ctx = e->ucph();
1095 ucp_worker_h worker = e->worker();
1096
1097 for (ucp_rsc_index_t i = 0; i < ctx->num_mds; ++i) {
1098 if (!(context_has_tls(ctx, better_tl, i) &&
1099 context_has_tls(ctx, tl, i))) {
1100 continue;
1101 }
1102
1103 ASSERT_TRUE(ctx->num_tls > worker->num_ifaces);
1104 EXPECT_TRUE(worker_has_tls(worker, better_tl, i));
1105 EXPECT_FALSE(worker_has_tls(worker, tl, i));
1106 }
1107 }
1108 };
1109
1110
UCS_TEST_P(test_ucp_wireup_unified,select_best_ifaces)1111 UCS_TEST_P(test_ucp_wireup_unified, select_best_ifaces)
1112 {
1113 // Accelerated transports have better performance charasteristics than their
1114 // verbs counterparts. Check that corresponding verbs transports are not used
1115 // by workers in unified mode.
1116 check_unified_ifaces(&sender(), "rc_mlx5", "rc_verbs");
1117 check_unified_ifaces(&sender(), "ud_mlx5", "ud_verbs");
1118
1119 // RC and DC has similar capabilities, but RC has better latency while
1120 // estimated number of endpoints is relatively small.
1121 // sender() is created with 1 ep, so RC should be selected over DC.
1122 check_unified_ifaces(&sender(), "rc_mlx5", "dc_mlx5");
1123
1124 // Set some big enough number of endpoints for DC to be more performance
1125 // efficient than RC. Now check that DC is selected over RC.
1126 modify_config("NUM_EPS", "1000");
1127 entity *e = create_entity();
1128 check_unified_ifaces(e, "dc_mlx5", "rc_mlx5");
1129 }
1130
1131 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, rc, "rc")
1132 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, ud, "ud")
1133 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, rc_dc, "rc,dc")
1134
1135 class test_ucp_wireup_fallback_amo : public test_ucp_wireup {
init()1136 void init() {
1137 size_t device_atomics_cnt = 0;
1138
1139 test_ucp_wireup::init();
1140
1141 for (ucp_rsc_index_t idx = 0; idx < sender().ucph()->num_tls; ++idx) {
1142 uct_iface_attr_t *attr = ucp_worker_iface_get_attr(sender().worker(),
1143 idx);
1144 if (attr->cap.flags & UCT_IFACE_FLAG_ATOMIC_DEVICE) {
1145 device_atomics_cnt++;
1146 }
1147 }
1148 bool device_atomics_supported = sender().worker()->atomic_tls != 0;
1149
1150 test_ucp_wireup::cleanup();
1151
1152 if (!device_atomics_supported || !device_atomics_cnt) {
1153 UCS_TEST_SKIP_R("there are no TLs that support device atomics");
1154 }
1155 }
1156
cleanup()1157 void cleanup() {
1158 /* do nothing */
1159 }
1160
1161 protected:
1162
use_device_amo(ucp_ep_h ep)1163 bool use_device_amo(ucp_ep_h ep) {
1164 ucp_ep_config_t *ep_config = ucp_ep_config(ep);
1165
1166 for (ucp_lane_index_t lane = 0; lane < UCP_MAX_LANES; ++lane) {
1167 if (ep_config->key.amo_lanes[lane] != UCP_NULL_LANE) {
1168 return (ucp_ep_get_iface_attr(ep, lane)->cap.flags &
1169 UCT_IFACE_FLAG_ATOMIC_DEVICE);
1170 }
1171 }
1172
1173 return false;
1174 }
1175
get_min_max_num_eps(ucp_ep_h ep)1176 size_t get_min_max_num_eps(ucp_ep_h ep) {
1177 unsigned long min_max_num_eps = UCS_ULUNITS_INF;
1178
1179 for (ucp_lane_index_t lane = 0; lane < ucp_ep_num_lanes(ep); lane++) {
1180 uct_iface_attr_t *iface_attr = ucp_ep_get_iface_attr(ep, lane);
1181
1182 if (iface_attr->max_num_eps < min_max_num_eps) {
1183 min_max_num_eps = iface_attr->max_num_eps;
1184 }
1185 }
1186
1187 return min_max_num_eps;
1188 }
1189
test_wireup_fallback_amo(const std::vector<std::string> & tls,size_t est_num_eps,bool should_use_device_amo)1190 size_t test_wireup_fallback_amo(const std::vector<std::string> &tls,
1191 size_t est_num_eps, bool should_use_device_amo) {
1192 unsigned long min_max_num_eps = UCS_ULUNITS_INF;
1193
1194 UCS_TEST_MESSAGE << "Testing " << est_num_eps << " number of EPs";
1195 modify_config("NUM_EPS", ucs::to_string(est_num_eps).c_str());
1196
1197 // Create new entity and add to to the end of vector
1198 // (thus it will be receiver without any connections)
1199 create_entity(false);
1200
1201 ucp_test_param params = GetParam();
1202 for (std::vector<std::string>::const_iterator i = tls.begin();
1203 i != tls.end(); ++i) {
1204 params.transports.clear();
1205 params.transports.push_back(*i);
1206 create_entity(true, params);
1207 sender().connect(&receiver(), get_ep_params());
1208
1209 EXPECT_EQ(should_use_device_amo, use_device_amo(sender().ep()));
1210
1211 size_t max_num_eps = get_min_max_num_eps(sender().ep());
1212 if (max_num_eps < min_max_num_eps) {
1213 min_max_num_eps = max_num_eps;
1214 }
1215 }
1216
1217 test_ucp_wireup::cleanup();
1218
1219 return min_max_num_eps;
1220 }
1221
1222 public:
1223
get_ctx_params()1224 static ucp_params_t get_ctx_params() {
1225 ucp_params_t params = test_ucp_wireup::get_ctx_params();
1226 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
1227 params.features |= (UCP_FEATURE_AMO32 |
1228 UCP_FEATURE_AMO64);
1229 return params;
1230 }
1231 };
1232
1233 class test_ucp_wireup_amo : public test_ucp_wireup {
1234 public:
1235 typedef struct {
1236 test_ucp_wireup_amo *test;
1237 } request_t;
1238
get_ctx_params()1239 static ucp_params_t get_ctx_params() {
1240 ucp_params_t params = test_ucp_wireup::get_ctx_params();
1241 params.field_mask |= UCP_PARAM_FIELD_REQUEST_SIZE;
1242 params.request_size = sizeof(request_t);
1243 return params;
1244 }
1245
1246 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)1247 enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
1248 const std::string& test_case_name, const std::string& tls)
1249 {
1250 uint64_t amo_features;
1251
1252 EXPECT_TRUE((sizeof(elem_type) == 4ul) || (sizeof(elem_type) == 8ul));
1253 amo_features = (sizeof(elem_type) == 4ul) ? UCP_FEATURE_AMO32 :
1254 UCP_FEATURE_AMO64;
1255 return enum_test_params_features(ctx_params, name, test_case_name, tls,
1256 amo_features, false);
1257 }
1258
1259 protected:
get_rkey(const entity & e)1260 ucp_rkey_h get_rkey(const entity &e) {
1261 if (&sender() == &e) {
1262 return test_ucp_wireup::get_rkey(e.ep(), m_memh_receiver);
1263 } else if (&receiver() == &e) {
1264 return test_ucp_wireup::get_rkey(e.ep(), m_memh_sender);
1265 }
1266
1267 return NULL;
1268 }
1269
add_rkey(ucp_rkey_h rkey)1270 void add_rkey(ucp_rkey_h rkey) {
1271 ASSERT_NE((ucp_rkey_h)NULL, rkey);
1272 m_rkeys.push_back(ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy));
1273 }
1274
fill_send_data()1275 void fill_send_data() {
1276 m_send_data[0] = ucs_generate_uuid(0);
1277 }
1278
flush_cb(void * req,ucs_status_t status)1279 static void flush_cb(void *req, ucs_status_t status) {
1280 request_t *request = (request_t *)req;
1281
1282 ASSERT_UCS_OK(status);
1283 request->test->rkeys_cleanup();
1284 request->test->memhs_cleanup();
1285 }
1286 };
1287
UCS_TEST_P(test_ucp_wireup_amo,relese_key_after_flush)1288 UCS_TEST_P(test_ucp_wireup_amo, relese_key_after_flush) {
1289 fill_send_data();
1290 clear_recv_data();
1291
1292 sender().connect(&receiver(), get_ep_params());
1293
1294 ucp_rkey_h rkey = get_rkey(sender());
1295 add_rkey(rkey);
1296
1297 ucs_status_t status = ucp_atomic_post(sender().ep(), UCP_ATOMIC_POST_OP_ADD,
1298 m_send_data[0], sizeof(elem_type),
1299 (uint64_t)&m_recv_data[0], rkey);
1300 ASSERT_UCS_OK(status);
1301 request_t *req = (request_t *)ucp_ep_flush_nb(sender().ep(), 0, flush_cb);
1302 if (UCS_PTR_IS_PTR(req)) {
1303 req->test = this;
1304 wait(req);
1305 } else {
1306 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
1307 }
1308 }
1309
1310 UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_amo)
1311
UCS_TEST_P(test_ucp_wireup_fallback_amo,different_amo_types)1312 UCS_TEST_P(test_ucp_wireup_fallback_amo, different_amo_types) {
1313 std::vector<std::string> tls;
1314
1315 /* the 1st peer support RC only (device atomics) */
1316 tls.push_back("rc");
1317 /* the 2nd peer support RC and SHM (device and CPU atomics) */
1318 tls.push_back("rc,shm");
1319
1320 size_t min_max_num_eps = test_wireup_fallback_amo(tls, 1, 1);
1321 test_wireup_fallback_amo(tls, min_max_num_eps + 1, 0);
1322 }
1323
1324 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback_amo,
1325 shm_rc, "shm,rc_x,rc_v")
1326
1327 /* NOTE: this fixture is NOT inherited from test_ucp_wireup, because we want to
1328 * create our own entities.
1329 */
1330 class test_ucp_wireup_asymmetric : public ucp_test {
1331 protected:
init()1332 virtual void init() {
1333 static const char *ibdev_sysfs_dir = "/sys/class/infiniband";
1334
1335 DIR *dir = opendir(ibdev_sysfs_dir);
1336 if (dir == NULL) {
1337 UCS_TEST_SKIP_R(std::string(ibdev_sysfs_dir) + " not found");
1338 }
1339
1340 for (;;) {
1341 struct dirent *entry = readdir(dir);
1342 if (entry == NULL) {
1343 break;
1344 }
1345
1346 if (entry->d_name[0] == '.') {
1347 continue;
1348 }
1349
1350 m_ib_devices.push_back(entry->d_name);
1351 }
1352
1353 closedir(dir);
1354 }
1355
tag_sendrecv(size_t size)1356 void tag_sendrecv(size_t size) {
1357 std::string send_data(size, 's');
1358 std::string recv_data(size, 'x');
1359
1360 ucs_status_ptr_t sreq = ucp_tag_send_nb(
1361 sender().ep(0), &send_data[0], size,
1362 ucp_dt_make_contig(1), 1,
1363 (ucp_send_callback_t)ucs_empty_function);
1364 ucs_status_ptr_t rreq = ucp_tag_recv_nb(
1365 receiver().worker(), &recv_data[0], size,
1366 ucp_dt_make_contig(1), 1, 1,
1367 (ucp_tag_recv_callback_t)ucs_empty_function);
1368 wait(sreq);
1369 wait(rreq);
1370
1371 EXPECT_EQ(send_data, recv_data);
1372 }
1373
1374 /* Generate a pci_bw configuration string for IB devices, which assigns
1375 * the speed ai+b for device i.
1376 */
pci_bw_config(int a,int b)1377 std::string pci_bw_config(int a, int b) {
1378 std::string config_str;
1379 for (size_t i = 0; i < m_ib_devices.size(); ++i) {
1380 config_str += m_ib_devices[i] + ":" +
1381 ucs::to_string((a * i) + b) + "Gbps";
1382 if (i != (m_ib_devices.size() - 1)) {
1383 config_str += ",";
1384 }
1385 }
1386 return config_str;
1387 }
1388
1389 std::vector<std::string> m_ib_devices;
1390
1391 public:
get_ctx_params()1392 static ucp_params_t get_ctx_params() {
1393 ucp_params_t params = ucp_test::get_ctx_params();
1394 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
1395 params.features = UCP_FEATURE_TAG;
1396 return params;
1397 }
1398 };
1399
1400 /*
1401 * Force asymmetric configuration by different PCI_BW settings
1402 */
UCS_TEST_SKIP_COND_P(test_ucp_wireup_asymmetric,connect,is_self ())1403 UCS_TEST_SKIP_COND_P(test_ucp_wireup_asymmetric, connect, is_self()) {
1404
1405 /* Enable cross-dev connection */
1406 /* coverity[tainted_string_argument] */
1407 ucs::scoped_setenv path_mtu_env("UCX_RC_PATH_MTU", "1024");
1408
1409 {
1410 std::string config_str = pci_bw_config(20, 20);
1411 UCS_TEST_MESSAGE << "creating sender: " << config_str;
1412 /* coverity[tainted_string_argument] */
1413 ucs::scoped_setenv pci_bw_env("UCX_IB_PCI_BW", config_str.c_str());
1414 create_entity();
1415 }
1416
1417 {
1418 std::string config_str = pci_bw_config(-20, m_ib_devices.size() * 20);
1419 UCS_TEST_MESSAGE << "creating receiver: " << config_str;
1420 /* coverity[tainted_string_argument] */
1421 ucs::scoped_setenv pci_bw_env("UCX_IB_PCI_BW", config_str.c_str());
1422 create_entity();
1423 }
1424
1425 sender().connect(&receiver(), get_ep_params());
1426 receiver().connect(&sender(), get_ep_params());
1427
1428 ucp_ep_print_info(sender().ep(), stdout);
1429 ucp_ep_print_info(receiver().ep(), stdout);
1430
1431 tag_sendrecv(1);
1432 tag_sendrecv(100000);
1433 tag_sendrecv(1000000);
1434 }
1435
1436 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, rcv, "rc_v")
1437 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, rcx, "rc_x")
1438 UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, ib, "ib")
1439