1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2017. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "ucp_test.h"
8 #include "common/test.h"
9 #include "ucp/ucp_test.h"
10
11 #include <common/test_helpers.h>
12 #include <ucs/sys/sys.h>
13 #include <ifaddrs.h>
14 #include <sys/poll.h>
15
16 extern "C" {
17 #include <ucp/core/ucp_listener.h>
18 }
19
20 #define UCP_INSTANTIATE_ALL_TEST_CASE(_test_case) \
21 UCP_INSTANTIATE_TEST_CASE (_test_case) \
22 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, shm, "shm") \
23 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, dc_ud, "dc_x,ud_v,ud_x,mm") \
24 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, no_ud_ud_x, "dc_x,mm") \
25 /* dc_ud case is for testing handling of a large worker address on
26 * UCT_IFACE_FLAG_CONNECT_TO_IFACE transports (dc_x) */
27 /* no_ud_ud_x case is for testing handling a large worker address
28 * but with the lack of ud/ud_x transports, which would return an error
29 * and skipped */
30
31 class test_ucp_sockaddr : public ucp_test {
32 public:
get_ctx_params()33 static ucp_params_t get_ctx_params() {
34 ucp_params_t params = ucp_test::get_ctx_params();
35 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
36 params.features = UCP_FEATURE_TAG | UCP_FEATURE_STREAM;
37 return params;
38 }
39
40 enum {
41 CONN_REQ_TAG = DEFAULT_PARAM_VARIANT + 1, /* Accepting by ucp_conn_request_h,
42 send/recv by TAG API */
43 CONN_REQ_STREAM /* Accepting by ucp_conn_request_h,
44 send/recv by STREAM API */
45 };
46
47 enum {
48 TEST_MODIFIER_MASK = UCS_MASK(16),
49 TEST_MODIFIER_MT = UCS_BIT(16),
50 TEST_MODIFIER_CM = UCS_BIT(17)
51 };
52
53 enum {
54 SEND_DIRECTION_C2S = UCS_BIT(0), /* send data from client to server */
55 SEND_DIRECTION_S2C = UCS_BIT(1), /* send data from server to client */
56 SEND_DIRECTION_BIDI = SEND_DIRECTION_C2S | SEND_DIRECTION_S2C /* bidirectional send */
57 };
58
59 typedef enum {
60 SEND_RECV_TAG,
61 SEND_RECV_STREAM
62 } send_recv_type_t;
63
64 ucs::sock_addr_storage m_test_addr;
65
init()66 void init() {
67 if (GetParam().variant & TEST_MODIFIER_CM) {
68 modify_config("SOCKADDR_CM_ENABLE", "yes");
69 }
70 get_sockaddr();
71 ucp_test::init();
72 skip_loopback();
73 }
74
75 static void
enum_test_params_with_modifier(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls,std::vector<ucp_test_param> & result,unsigned modifier)76 enum_test_params_with_modifier(const ucp_params_t& ctx_params,
77 const std::string& name,
78 const std::string& test_case_name,
79 const std::string& tls,
80 std::vector<ucp_test_param> &result,
81 unsigned modifier)
82 {
83 generate_test_params_variant(ctx_params, name, test_case_name, tls,
84 modifier, result, SINGLE_THREAD);
85 generate_test_params_variant(ctx_params, name, test_case_name, tls,
86 modifier | TEST_MODIFIER_MT, result,
87 MULTI_THREAD_WORKER);
88 }
89
90 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)91 enum_test_params(const ucp_params_t& ctx_params,
92 const std::string& name,
93 const std::string& test_case_name,
94 const std::string& tls)
95 {
96 std::vector<ucp_test_param> result =
97 ucp_test::enum_test_params(ctx_params, name, test_case_name, tls);
98
99 enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
100 result, CONN_REQ_TAG);
101 enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
102 result, CONN_REQ_TAG | TEST_MODIFIER_CM);
103 enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
104 result, CONN_REQ_STREAM);
105 enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
106 result, CONN_REQ_STREAM | TEST_MODIFIER_CM);
107 return result;
108 }
109
110 static ucs_log_func_rc_t
detect_error_logger(const char * file,unsigned line,const char * function,ucs_log_level_t level,const ucs_log_component_config_t * comp_conf,const char * message,va_list ap)111 detect_error_logger(const char *file, unsigned line, const char *function,
112 ucs_log_level_t level,
113 const ucs_log_component_config_t *comp_conf,
114 const char *message, va_list ap)
115 {
116 if (level == UCS_LOG_LEVEL_ERROR) {
117 static std::vector<std::string> stop_list;
118 if (stop_list.empty()) {
119 stop_list.push_back("no supported sockaddr auxiliary transports found for");
120 stop_list.push_back("sockaddr aux resources addresses");
121 stop_list.push_back("no peer failure handler");
122 stop_list.push_back("connection request failed on listener");
123 /* when the "peer failure" error happens, it is followed by: */
124 stop_list.push_back("received event RDMA_CM_EVENT_UNREACHABLE");
125 stop_list.push_back(ucs_status_string(UCS_ERR_UNREACHABLE));
126 stop_list.push_back(ucs_status_string(UCS_ERR_UNSUPPORTED));
127 }
128
129 std::string err_str = format_message(message, ap);
130 for (size_t i = 0; i < stop_list.size(); ++i) {
131 if (err_str.find(stop_list[i]) != std::string::npos) {
132 UCS_TEST_MESSAGE << err_str;
133 return UCS_LOG_FUNC_RC_STOP;
134 }
135 }
136 }
137 return UCS_LOG_FUNC_RC_CONTINUE;
138 }
139
get_sockaddr()140 void get_sockaddr() {
141 std::vector<ucs::sock_addr_storage> saddrs;
142 struct ifaddrs* ifaddrs;
143 ucs_status_t status;
144 size_t size;
145 int ret = getifaddrs(&ifaddrs);
146 ASSERT_EQ(ret, 0);
147
148 for (struct ifaddrs *ifa = ifaddrs; ifa != NULL; ifa = ifa->ifa_next) {
149 if (ucs_netif_flags_is_active(ifa->ifa_flags) &&
150 ucs::is_inet_addr(ifa->ifa_addr) &&
151 ucs::is_rdmacm_netdev(ifa->ifa_name))
152 {
153 saddrs.push_back(ucs::sock_addr_storage());
154 status = ucs_sockaddr_sizeof(ifa->ifa_addr, &size);
155 ASSERT_UCS_OK(status);
156 saddrs.back().set_sock_addr(*ifa->ifa_addr, size);
157 saddrs.back().set_port(0); /* listen on any port then update */
158 }
159 }
160
161 freeifaddrs(ifaddrs);
162
163 if (saddrs.empty()) {
164 UCS_TEST_SKIP_R("No interface for testing");
165 }
166
167 static const std::string dc_tls[] = { "dc", "dc_x", "ib" };
168
169 bool has_dc = has_any_transport(
170 std::vector<std::string>(dc_tls, dc_tls + ucs_array_size(dc_tls)));
171
172 /* FIXME: select random interface, except for DC transport, which do not
173 yet support having different gid_index for different UCT
174 endpoints on same iface */
175 int saddr_idx = has_dc ? 0 : (ucs::rand() % saddrs.size());
176 m_test_addr = saddrs[saddr_idx];
177 }
178
start_listener(ucp_test_base::entity::listen_cb_type_t cb_type)179 void start_listener(ucp_test_base::entity::listen_cb_type_t cb_type)
180 {
181 ucs_time_t deadline = ucs::get_deadline();
182 ucs_status_t status;
183
184 do {
185 status = receiver().listen(cb_type, m_test_addr.get_sock_addr_ptr(),
186 m_test_addr.get_addr_size(),
187 get_server_ep_params());
188 } while ((status == UCS_ERR_BUSY) && (ucs_get_time() < deadline));
189
190 if (status == UCS_ERR_UNREACHABLE) {
191 UCS_TEST_SKIP_R("cannot listen to " + m_test_addr.to_str());
192 }
193
194 ASSERT_UCS_OK(status);
195 ucp_listener_attr_t attr;
196 uint16_t port;
197
198 attr.field_mask = UCP_LISTENER_ATTR_FIELD_SOCKADDR;
199 ASSERT_UCS_OK(ucp_listener_query(receiver().listenerh(), &attr));
200 ASSERT_UCS_OK(ucs_sockaddr_get_port(
201 (const struct sockaddr *)&attr.sockaddr, &port));
202 m_test_addr.set_port(port);
203 UCS_TEST_MESSAGE << "server listening on " << m_test_addr.to_str();
204 }
205
scomplete_cb(void * req,ucs_status_t status)206 static void scomplete_cb(void *req, ucs_status_t status)
207 {
208 if ((status == UCS_OK) ||
209 (status == UCS_ERR_UNREACHABLE) ||
210 (status == UCS_ERR_REJECTED)) {
211 return;
212 }
213 UCS_TEST_ABORT("Error: " << ucs_status_string(status));
214 }
215
rtag_complete_cb(void * req,ucs_status_t status,ucp_tag_recv_info_t * info)216 static void rtag_complete_cb(void *req, ucs_status_t status,
217 ucp_tag_recv_info_t *info)
218 {
219 EXPECT_UCS_OK(status);
220 }
221
rstream_complete_cb(void * req,ucs_status_t status,size_t length)222 static void rstream_complete_cb(void *req, ucs_status_t status,
223 size_t length)
224 {
225 EXPECT_UCS_OK(status);
226 }
227
wait_for_wakeup(ucp_worker_h send_worker,ucp_worker_h recv_worker)228 static void wait_for_wakeup(ucp_worker_h send_worker, ucp_worker_h recv_worker)
229 {
230 int ret, send_efd, recv_efd;
231 ucs_status_t status;
232
233 ASSERT_UCS_OK(ucp_worker_get_efd(send_worker, &send_efd));
234 ASSERT_UCS_OK(ucp_worker_get_efd(recv_worker, &recv_efd));
235
236 status = ucp_worker_arm(recv_worker);
237 if (status == UCS_ERR_BUSY) {
238 return;
239 }
240 ASSERT_UCS_OK(status);
241
242 status = ucp_worker_arm(send_worker);
243 if (status == UCS_ERR_BUSY) {
244 return;
245 }
246 ASSERT_UCS_OK(status);
247
248 do {
249 struct pollfd pfd[2];
250 pfd[0].fd = send_efd;
251 pfd[1].fd = recv_efd;
252 pfd[0].events = POLLIN;
253 pfd[1].events = POLLIN;
254 ret = poll(pfd, 2, -1);
255 } while ((ret < 0) && (errno == EINTR));
256 if (ret < 0) {
257 UCS_TEST_MESSAGE << "poll() failed: " << strerror(errno);
258 }
259
260 EXPECT_GE(ret, 1);
261 }
262
check_events(ucp_worker_h send_worker,ucp_worker_h recv_worker,bool wakeup,void * req)263 void check_events(ucp_worker_h send_worker, ucp_worker_h recv_worker,
264 bool wakeup, void *req)
265 {
266 if (progress()) {
267 return;
268 }
269
270 if ((req != NULL) && (ucp_request_check_status(req) == UCS_ERR_UNREACHABLE)) {
271 return;
272 }
273
274 if (wakeup) {
275 wait_for_wakeup(send_worker, recv_worker);
276 }
277 }
278
send_recv(entity & from,entity & to,send_recv_type_t send_recv_type,bool wakeup,ucp_test_base::entity::listen_cb_type_t cb_type)279 void send_recv(entity& from, entity& to, send_recv_type_t send_recv_type,
280 bool wakeup, ucp_test_base::entity::listen_cb_type_t cb_type)
281 {
282 const uint64_t send_data = ucs_generate_uuid(0);
283 void *send_req = NULL;
284 if (send_recv_type == SEND_RECV_TAG) {
285 send_req = ucp_tag_send_nb(from.ep(), &send_data, 1,
286 ucp_dt_make_contig(sizeof(send_data)), 1,
287 scomplete_cb);
288 } else if (send_recv_type == SEND_RECV_STREAM) {
289 send_req = ucp_stream_send_nb(from.ep(), &send_data, 1,
290 ucp_dt_make_contig(sizeof(send_data)),
291 scomplete_cb, 0);
292 } else {
293 ASSERT_TRUE(false) << "unsupported communication type";
294 }
295
296 ucs_status_t send_status;
297 if (send_req == NULL) {
298 send_status = UCS_OK;
299 } else if (UCS_PTR_IS_ERR(send_req)) {
300 send_status = UCS_PTR_STATUS(send_req);
301 ASSERT_UCS_OK(send_status);
302 } else {
303 while (!ucp_request_is_completed(send_req)) {
304 check_events(from.worker(), to.worker(), wakeup, send_req);
305 }
306 send_status = ucp_request_check_status(send_req);
307 ucp_request_free(send_req);
308 }
309
310 if (send_status == UCS_ERR_UNREACHABLE) {
311 /* Check if the error was completed due to the error handling flow.
312 * If so, skip the test since a valid error occurred - the one expected
313 * from the error handling flow - cases of failure to handle long worker
314 * address or transport doesn't support the error handling requirement */
315 UCS_TEST_SKIP_R("Skipping due an unreachable destination (unsupported "
316 "feature or too long worker address or no "
317 "supported transport to send partial worker "
318 "address)");
319 } else if ((send_status == UCS_ERR_REJECTED) &&
320 (cb_type == ucp_test_base::entity::LISTEN_CB_REJECT)) {
321 return;
322 } else {
323 ASSERT_UCS_OK(send_status);
324 }
325
326 uint64_t recv_data = 0;
327 void *recv_req;
328 if (send_recv_type == SEND_RECV_TAG) {
329 recv_req = ucp_tag_recv_nb(to.worker(), &recv_data, 1,
330 ucp_dt_make_contig(sizeof(recv_data)),
331 1, 0, rtag_complete_cb);
332 } else {
333 ASSERT_TRUE(send_recv_type == SEND_RECV_STREAM);
334 ucp_stream_poll_ep_t poll_eps;
335 ssize_t ep_count;
336 size_t recv_length;
337 do {
338 progress();
339 ep_count = ucp_stream_worker_poll(to.worker(), &poll_eps, 1, 0);
340 } while (ep_count == 0);
341 ASSERT_EQ(1, ep_count);
342 EXPECT_EQ(to.ep(), poll_eps.ep);
343 EXPECT_EQ(&to, poll_eps.user_data);
344
345 recv_req = ucp_stream_recv_nb(to.ep(), &recv_data, 1,
346 ucp_dt_make_contig(sizeof(recv_data)),
347 rstream_complete_cb, &recv_length,
348 UCP_STREAM_RECV_FLAG_WAITALL);
349 }
350
351 if (recv_req != NULL) {
352 ASSERT_TRUE(UCS_PTR_IS_PTR(recv_req));
353 while (!ucp_request_is_completed(recv_req)) {
354 check_events(from.worker(), to.worker(), wakeup, recv_req);
355 }
356 ucp_request_free(recv_req);
357 }
358
359 EXPECT_EQ(send_data, recv_data);
360 }
361
wait_for_server_ep(bool wakeup)362 bool wait_for_server_ep(bool wakeup)
363 {
364 ucs_time_t deadline = ucs::get_deadline();
365
366 while ((receiver().get_num_eps() == 0) &&
367 (sender().get_err_num() == 0) && (ucs_get_time() < deadline)) {
368 check_events(sender().worker(), receiver().worker(), wakeup, NULL);
369 }
370
371 return (sender().get_err_num() == 0) && (receiver().get_num_eps() > 0);
372 }
373
wait_for_reject(entity & e,bool wakeup)374 void wait_for_reject(entity &e, bool wakeup)
375 {
376 ucs_time_t deadline = ucs::get_deadline();
377
378 while ((e.get_err_num_rejected() == 0) && (ucs_get_time() < deadline)) {
379 check_events(sender().worker(), receiver().worker(), wakeup, NULL);
380 }
381
382 EXPECT_GT(deadline, ucs_get_time());
383 EXPECT_EQ(1ul, e.get_err_num_rejected());
384 }
385
get_ep_params()386 virtual ucp_ep_params_t get_ep_params()
387 {
388 ucp_ep_params_t ep_params = ucp_test::get_ep_params();
389 ep_params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
390 UCP_EP_PARAM_FIELD_ERR_HANDLER;
391 /* The error handling requirement is needed since we need to take
392 * care of a case where the client gets an error. In case ucp needs to
393 * handle a large worker address but neither ud nor ud_x are present */
394 ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
395 ep_params.err_handler.cb = err_handler_cb;
396 ep_params.err_handler.arg = NULL;
397 return ep_params;
398 }
399
get_server_ep_params()400 virtual ucp_ep_params_t get_server_ep_params() {
401 return get_ep_params();
402 }
403
client_ep_connect()404 void client_ep_connect()
405 {
406 ucp_ep_params_t ep_params = get_ep_params();
407 ep_params.field_mask |= UCP_EP_PARAM_FIELD_FLAGS |
408 UCP_EP_PARAM_FIELD_SOCK_ADDR |
409 UCP_EP_PARAM_FIELD_USER_DATA;
410 ep_params.flags = UCP_EP_PARAMS_FLAGS_CLIENT_SERVER;
411 ep_params.sockaddr.addr = m_test_addr.get_sock_addr_ptr();
412 ep_params.sockaddr.addrlen = m_test_addr.get_addr_size();
413 ep_params.user_data = &sender();
414 sender().connect(&receiver(), ep_params);
415 }
416
connect_and_send_recv(bool wakeup,uint64_t flags)417 void connect_and_send_recv(bool wakeup, uint64_t flags)
418 {
419 {
420 scoped_log_handler slh(detect_error_logger);
421 client_ep_connect();
422 if (!wait_for_server_ep(wakeup)) {
423 UCS_TEST_SKIP_R("cannot connect to server");
424 }
425 }
426
427 if (flags & SEND_DIRECTION_C2S) {
428 send_recv(sender(), receiver(), send_recv_type(), wakeup,
429 cb_type());
430 }
431
432 if (flags & SEND_DIRECTION_S2C) {
433 send_recv(receiver(), sender(), send_recv_type(), wakeup,
434 cb_type());
435 }
436 }
437
connect_and_reject(bool wakeup)438 void connect_and_reject(bool wakeup)
439 {
440 {
441 scoped_log_handler slh(detect_error_logger);
442 client_ep_connect();
443 /* Check reachability with tagged send */
444 send_recv(sender(), receiver(), SEND_RECV_TAG, wakeup,
445 ucp_test_base::entity::LISTEN_CB_REJECT);
446 }
447 wait_for_reject(receiver(), wakeup);
448 wait_for_reject(sender(), wakeup);
449 }
450
listen_and_communicate(bool wakeup,uint64_t flags)451 void listen_and_communicate(bool wakeup, uint64_t flags)
452 {
453 UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
454
455 start_listener(cb_type());
456 connect_and_send_recv(wakeup, flags);
457 }
458
listen_and_reject(bool wakeup)459 void listen_and_reject(bool wakeup)
460 {
461 UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
462
463 start_listener(ucp_test_base::entity::LISTEN_CB_REJECT);
464 connect_and_reject(wakeup);
465 }
466
one_sided_disconnect(entity & e,enum ucp_ep_close_mode mode)467 void one_sided_disconnect(entity &e, enum ucp_ep_close_mode mode) {
468 void *req = e.disconnect_nb(0, 0, mode);
469 ucs_time_t deadline = ucs_time_from_sec(10.0) + ucs_get_time();
470 while (!is_request_completed(req) && (ucs_get_time() < deadline)) {
471 /* TODO: replace the progress() with e().progress() when
472 async progress is implemented. */
473 progress();
474 };
475
476 e.close_ep_req_free(req);
477 }
478
concurrent_disconnect(enum ucp_ep_close_mode mode)479 void concurrent_disconnect(enum ucp_ep_close_mode mode) {
480 ASSERT_EQ(2ul, entities().size());
481 ASSERT_EQ(1, sender().get_num_workers());
482 ASSERT_EQ(1, sender().get_num_eps());
483 ASSERT_EQ(1, receiver().get_num_workers());
484 ASSERT_EQ(1, receiver().get_num_eps());
485
486 void *sender_ep_close_req = sender().disconnect_nb(0, 0, mode);
487 void *receiver_ep_close_req = receiver().disconnect_nb(0, 0, mode);
488
489 ucs_time_t deadline = ucs::get_deadline();
490 while ((!is_request_completed(sender_ep_close_req) ||
491 !is_request_completed(receiver_ep_close_req)) &&
492 (ucs_get_time() < deadline)) {
493 progress();
494 }
495
496 sender().close_ep_req_free(sender_ep_close_req);
497 receiver().close_ep_req_free(receiver_ep_close_req);
498 }
499
err_handler_cb(void * arg,ucp_ep_h ep,ucs_status_t status)500 static void err_handler_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
501 ucp_test::err_handler_cb(arg, ep, status);
502
503 /* The current expected errors are only from the err_handle test
504 * and from transports where the worker address is too long but ud/ud_x
505 * are not present, or ud/ud_x are present but their addresses are too
506 * long as well, in addition we can get disconnect events during test
507 * teardown.
508 */
509 switch (status) {
510 case UCS_ERR_REJECTED:
511 case UCS_ERR_UNREACHABLE:
512 case UCS_ERR_CONNECTION_RESET:
513 UCS_TEST_MESSAGE << "ignoring error " <<ucs_status_string(status)
514 << " on endpoint " << ep;
515 return;
516 default:
517 UCS_TEST_ABORT("Error: " << ucs_status_string(status));
518 }
519 }
520
521 protected:
cb_type() const522 ucp_test_base::entity::listen_cb_type_t cb_type() const {
523 const int variant = (GetParam().variant & TEST_MODIFIER_MASK);
524 if ((variant == CONN_REQ_TAG) || (variant == CONN_REQ_STREAM)) {
525 return ucp_test_base::entity::LISTEN_CB_CONN;
526 }
527 return ucp_test_base::entity::LISTEN_CB_EP;
528 }
529
send_recv_type() const530 send_recv_type_t send_recv_type() const {
531 switch (GetParam().variant & TEST_MODIFIER_MASK) {
532 case CONN_REQ_STREAM:
533 return SEND_RECV_STREAM;
534 case CONN_REQ_TAG:
535 /* fallthrough */
536 default:
537 return SEND_RECV_TAG;
538 }
539 }
540
nonparameterized_test() const541 bool nonparameterized_test() const {
542 return (GetParam().variant != DEFAULT_PARAM_VARIANT) &&
543 (GetParam().variant != (CONN_REQ_TAG | TEST_MODIFIER_CM));
544 }
545
no_close_protocol() const546 bool no_close_protocol() const {
547 return !(GetParam().variant & TEST_MODIFIER_CM);
548 }
549 };
550
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,listen,no_close_protocol ())551 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listen, no_close_protocol()) {
552 listen_and_communicate(false, 0);
553 }
554
UCS_TEST_P(test_ucp_sockaddr,listen_c2s)555 UCS_TEST_P(test_ucp_sockaddr, listen_c2s) {
556 listen_and_communicate(false, SEND_DIRECTION_C2S);
557 }
558
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,listen_s2c,no_close_protocol ())559 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listen_s2c, no_close_protocol()) {
560 listen_and_communicate(false, SEND_DIRECTION_S2C);
561 }
562
UCS_TEST_P(test_ucp_sockaddr,listen_bidi)563 UCS_TEST_P(test_ucp_sockaddr, listen_bidi) {
564 listen_and_communicate(false, SEND_DIRECTION_BIDI);
565 }
566
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,onesided_disconnect,no_close_protocol ())567 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect,
568 no_close_protocol()) {
569 listen_and_communicate(false, 0);
570 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
571 }
572
UCS_TEST_P(test_ucp_sockaddr,onesided_disconnect_c2s)573 UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_c2s) {
574 listen_and_communicate(false, SEND_DIRECTION_C2S);
575 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
576 }
577
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,onesided_disconnect_s2c,no_close_protocol ())578 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect_s2c,
579 no_close_protocol()) {
580 listen_and_communicate(false, SEND_DIRECTION_S2C);
581 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
582 }
583
UCS_TEST_P(test_ucp_sockaddr,onesided_disconnect_bidi)584 UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_bidi) {
585 listen_and_communicate(false, SEND_DIRECTION_BIDI);
586 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
587 }
588
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect,no_close_protocol ())589 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect,
590 no_close_protocol()) {
591 listen_and_communicate(false, 0);
592 concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
593 }
594
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_c2s,no_close_protocol ())595 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_c2s,
596 no_close_protocol()) {
597 listen_and_communicate(false, SEND_DIRECTION_C2S);
598 concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
599 }
600
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_s2c,no_close_protocol ())601 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_s2c,
602 no_close_protocol()) {
603 listen_and_communicate(false, SEND_DIRECTION_S2C);
604 concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
605 }
606
UCS_TEST_P(test_ucp_sockaddr,concurrent_disconnect_bidi)607 UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_bidi) {
608 listen_and_communicate(false, SEND_DIRECTION_BIDI);
609 concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
610 }
611
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_force,no_close_protocol ())612 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force,
613 no_close_protocol()) {
614 listen_and_communicate(false, 0);
615 concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
616 }
617
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_force_c2s,no_close_protocol ())618 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force_c2s,
619 no_close_protocol()) {
620 listen_and_communicate(false, SEND_DIRECTION_C2S);
621 concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
622 }
623
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_force_s2c,no_close_protocol ())624 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force_s2c,
625 no_close_protocol()) {
626 listen_and_communicate(false, SEND_DIRECTION_S2C);
627 concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
628 }
629
UCS_TEST_P(test_ucp_sockaddr,concurrent_disconnect_force_bidi)630 UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_force_bidi) {
631 listen_and_communicate(false, SEND_DIRECTION_BIDI);
632 concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
633 }
634
UCS_TEST_P(test_ucp_sockaddr,listen_inaddr_any)635 UCS_TEST_P(test_ucp_sockaddr, listen_inaddr_any) {
636 /* save testing address */
637 ucs::sock_addr_storage test_addr(m_test_addr);
638 m_test_addr.reset_to_any();
639
640 UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
641
642 start_listener(cb_type());
643 /* get the actual port which was selected by listener */
644 test_addr.set_port(m_test_addr.get_port());
645 /* restore address */
646 m_test_addr = test_addr;
647 connect_and_send_recv(false, SEND_DIRECTION_C2S);
648 }
649
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,reject,nonparameterized_test ())650 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, reject, nonparameterized_test()) {
651 listen_and_reject(false);
652 }
653
UCS_TEST_P(test_ucp_sockaddr,listener_query)654 UCS_TEST_P(test_ucp_sockaddr, listener_query) {
655 ucp_listener_attr_t listener_attr;
656 ucs_status_t status;
657
658 listener_attr.field_mask = UCP_LISTENER_ATTR_FIELD_SOCKADDR;
659
660 UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
661
662 start_listener(cb_type());
663 status = ucp_listener_query(receiver().listenerh(), &listener_attr);
664 EXPECT_UCS_OK(status);
665
666 EXPECT_EQ(m_test_addr, listener_attr.sockaddr);
667 }
668
UCS_TEST_P(test_ucp_sockaddr,err_handle)669 UCS_TEST_P(test_ucp_sockaddr, err_handle) {
670
671 ucs::sock_addr_storage listen_addr(m_test_addr.to_ucs_sock_addr());
672 ucs_status_t status = receiver().listen(cb_type(),
673 m_test_addr.get_sock_addr_ptr(),
674 m_test_addr.get_addr_size(),
675 get_server_ep_params());
676 if (status == UCS_ERR_UNREACHABLE) {
677 UCS_TEST_SKIP_R("cannot listen to " + m_test_addr.to_str());
678 }
679
680 /* make the client try to connect to a non-existing port on the server side */
681 m_test_addr.set_port(1);
682
683 {
684 scoped_log_handler slh(wrap_errors_logger);
685 client_ep_connect();
686 /* allow for the unreachable event to arrive before restoring errors */
687 wait_for_flag(&sender().get_err_num());
688 }
689
690 EXPECT_EQ(1u, sender().get_err_num());
691 }
692
693 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr)
694
695 class test_ucp_sockaddr_destroy_ep_on_err : public test_ucp_sockaddr {
696 public:
test_ucp_sockaddr_destroy_ep_on_err()697 test_ucp_sockaddr_destroy_ep_on_err() {
698 /* Set small TL timeouts to reduce testing time */
699 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TIMEOUT", "10ms"));
700 m_env.push_back(new ucs::scoped_setenv("UCX_RC_RNR_TIMEOUT", "10ms"));
701 m_env.push_back(new ucs::scoped_setenv("UCX_RC_RETRY_COUNT", "2"));
702 }
703
get_server_ep_params()704 virtual ucp_ep_params_t get_server_ep_params() {
705 ucp_ep_params_t params = test_ucp_sockaddr::get_server_ep_params();
706
707 params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
708 UCP_EP_PARAM_FIELD_ERR_HANDLER |
709 UCP_EP_PARAM_FIELD_USER_DATA;
710 params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
711 params.err_handler.cb = err_handler_cb;
712 params.err_handler.arg = NULL;
713 params.user_data = &receiver();
714 return params;
715 }
716
err_handler_cb(void * arg,ucp_ep_h ep,ucs_status_t status)717 static void err_handler_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
718 test_ucp_sockaddr::err_handler_cb(arg, ep, status);
719 entity *e = reinterpret_cast<entity *>(arg);
720 e->disconnect_nb(0, 0, UCP_EP_CLOSE_MODE_FORCE);
721 }
722
723 private:
724 ucs::ptr_vector<ucs::scoped_setenv> m_env;
725 };
726
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,empty,no_close_protocol ())727 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, empty,
728 no_close_protocol()) {
729 listen_and_communicate(false, 0);
730 }
731
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,s2c,no_close_protocol ())732 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, s2c,
733 no_close_protocol()) {
734 listen_and_communicate(false, SEND_DIRECTION_S2C);
735 }
736
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,c2s,no_close_protocol ())737 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, c2s,
738 no_close_protocol()) {
739 listen_and_communicate(false, SEND_DIRECTION_C2S);
740 }
741
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,bidi,no_close_protocol ())742 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, bidi,
743 no_close_protocol()) {
744 listen_and_communicate(false, SEND_DIRECTION_BIDI);
745 }
746
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_client_cforce,no_close_protocol ())747 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_cforce,
748 no_close_protocol()) {
749 listen_and_communicate(false, 0);
750 scoped_log_handler slh(wrap_errors_logger);
751 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE);
752 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
753 }
754
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_c2s_cforce,no_close_protocol ())755 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_cforce,
756 no_close_protocol()) {
757 listen_and_communicate(false, SEND_DIRECTION_C2S);
758 scoped_log_handler slh(wrap_errors_logger);
759 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE);
760 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
761 }
762
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_s2c_cforce,no_close_protocol ())763 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_cforce,
764 no_close_protocol()) {
765 listen_and_communicate(false, SEND_DIRECTION_S2C);
766 scoped_log_handler slh(wrap_errors_logger);
767 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE);
768 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
769 }
770
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_bidi_cforce,no_close_protocol ())771 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_cforce,
772 no_close_protocol()) {
773 listen_and_communicate(false, SEND_DIRECTION_BIDI);
774 scoped_log_handler slh(wrap_errors_logger);
775 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FORCE);
776 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
777 }
778
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_client_sforce,no_close_protocol ())779 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_sforce,
780 no_close_protocol()) {
781 listen_and_communicate(false, 0);
782 scoped_log_handler slh(wrap_errors_logger);
783 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
784 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
785 }
786
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_c2s_sforce,no_close_protocol ())787 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_sforce,
788 no_close_protocol()) {
789 listen_and_communicate(false, SEND_DIRECTION_C2S);
790 scoped_log_handler slh(wrap_errors_logger);
791 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
792 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
793 }
794
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_s2c_sforce,no_close_protocol ())795 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_sforce,
796 no_close_protocol()) {
797 listen_and_communicate(false, SEND_DIRECTION_S2C);
798 scoped_log_handler slh(wrap_errors_logger);
799 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
800 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
801 }
802
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_bidi_sforce,no_close_protocol ())803 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_sforce,
804 no_close_protocol()) {
805 listen_and_communicate(false, SEND_DIRECTION_BIDI);
806 scoped_log_handler slh(wrap_errors_logger);
807 one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
808 one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
809 }
810
811 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr_destroy_ep_on_err)
812
813 class test_ucp_sockaddr_with_wakeup : public test_ucp_sockaddr {
814 public:
get_ctx_params()815 static ucp_params_t get_ctx_params() {
816 ucp_params_t params = test_ucp_sockaddr::get_ctx_params();
817 params.features |= UCP_FEATURE_WAKEUP;
818 return params;
819 }
820 };
821
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup,wakeup,no_close_protocol ())822 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, wakeup,
823 no_close_protocol()) {
824 listen_and_communicate(true, 0);
825 }
826
UCS_TEST_P(test_ucp_sockaddr_with_wakeup,wakeup_c2s)827 UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup_c2s) {
828 listen_and_communicate(true, SEND_DIRECTION_C2S);
829 }
830
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup,wakeup_s2c,no_close_protocol ())831 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, wakeup_s2c,
832 no_close_protocol()) {
833 listen_and_communicate(true, SEND_DIRECTION_S2C);
834 }
835
UCS_TEST_P(test_ucp_sockaddr_with_wakeup,wakeup_bidi)836 UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup_bidi) {
837 listen_and_communicate(true, SEND_DIRECTION_BIDI);
838 }
839
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup,reject,nonparameterized_test ())840 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, reject,
841 nonparameterized_test()) {
842 listen_and_reject(true);
843 }
844
845 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr_with_wakeup)
846
847
848 class test_ucp_sockaddr_with_rma_atomic : public test_ucp_sockaddr {
849 public:
850
get_ctx_params()851 static ucp_params_t get_ctx_params() {
852 ucp_params_t params = test_ucp_sockaddr::get_ctx_params();
853 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
854 params.features |= UCP_FEATURE_RMA |
855 UCP_FEATURE_AMO32 |
856 UCP_FEATURE_AMO64;
857 return params;
858 }
859 };
860
UCS_TEST_P(test_ucp_sockaddr_with_rma_atomic,wireup)861 UCS_TEST_P(test_ucp_sockaddr_with_rma_atomic, wireup) {
862
863 /* This test makes sure that the client-server flow works when the required
864 * features are RMA/ATOMIC. With these features, need to make sure that
865 * there is a lane for ucp-wireup (an am_lane should be created and used) */
866 UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
867
868 start_listener(cb_type());
869 {
870 scoped_log_handler slh(wrap_errors_logger);
871
872 client_ep_connect();
873
874 /* allow the err_handler callback to be invoked if needed */
875 if (!wait_for_server_ep(false)) {
876 EXPECT_EQ(1ul, sender().get_err_num());
877 UCS_TEST_SKIP_R("cannot connect to server");
878 }
879
880 EXPECT_EQ(0ul, sender().get_err_num());
881 /* even if server EP is created, in case of long address, wireup will be
882 * done later, need to communicate */
883 send_recv(sender(), receiver(), send_recv_type(), false, cb_type());
884 }
885 }
886
887 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr_with_rma_atomic)
888
889
890 class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
891 public:
get_ctx_params()892 static ucp_params_t get_ctx_params() {
893 ucp_params_t params = test_ucp_sockaddr::get_ctx_params();
894 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
895 params.features |= UCP_FEATURE_RMA | UCP_FEATURE_AM;
896 /* Atomics not supported for now because need to emulate the case
897 * of using different device than the one selected by default on the
898 * worker for atomic operations */
899 return params;
900 }
901
902 static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)903 enum_test_params(const ucp_params_t& ctx_params,
904 const std::string& name,
905 const std::string& test_case_name,
906 const std::string& tls)
907 {
908 std::vector<ucp_test_param> result;
909 enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
910 result, TEST_MODIFIER_CM);
911 return result;
912 }
913
init()914 virtual void init() {
915 test_ucp_sockaddr::init();
916 start_listener(cb_type());
917 client_ep_connect();
918 }
919
get_nb(std::string & send_buf,std::string & recv_buf,ucp_rkey_h rkey,std::vector<void * > & reqs)920 void get_nb(std::string& send_buf, std::string& recv_buf, ucp_rkey_h rkey,
921 std::vector<void*>& reqs)
922 {
923 reqs.push_back(ucp_get_nb(sender().ep(), &send_buf[0], send_buf.size(),
924 (uintptr_t)&recv_buf[0], rkey, scomplete_cb));
925 }
926
put_nb(std::string & send_buf,std::string & recv_buf,ucp_rkey_h rkey,std::vector<void * > & reqs)927 void put_nb(std::string& send_buf, std::string& recv_buf, ucp_rkey_h rkey,
928 std::vector<void*>& reqs)
929 {
930 reqs.push_back(ucp_put_nb(sender().ep(), &send_buf[0], send_buf.size(),
931 (uintptr_t)&recv_buf[0], rkey, scomplete_cb));
932 reqs.push_back(ucp_ep_flush_nb(sender().ep(), 0, scomplete_cb));
933 }
934
935 protected:
936 typedef void (test_ucp_sockaddr_protocols::*rma_nb_func_t)(
937 std::string&, std::string&, ucp_rkey_h, std::vector<void*>&);
938
compare_buffers(std::string & send_buf,std::string & recv_buf)939 void compare_buffers(std::string& send_buf, std::string& recv_buf)
940 {
941 EXPECT_TRUE(send_buf == recv_buf)
942 << "send_buf: '" << ucs::compact_string(send_buf, 20) << "', "
943 << "recv_buf: '" << ucs::compact_string(recv_buf, 20) << "'";
944 }
945
test_tag_send_recv(size_t size,bool is_exp,bool is_sync=false)946 void test_tag_send_recv(size_t size, bool is_exp, bool is_sync = false)
947 {
948 std::string send_buf(size, 'x');
949 std::string recv_buf(size, 'y');
950
951 void *rreq = NULL, *sreq = NULL;
952
953 if (is_exp) {
954 rreq = ucp_tag_recv_nb(receiver().worker(), &recv_buf[0], size,
955 ucp_dt_make_contig(1), 0, 0, rtag_complete_cb);
956 }
957
958 if (is_sync) {
959 sreq = ucp_tag_send_sync_nb(sender().ep(), &send_buf[0], size,
960 ucp_dt_make_contig(1), 0, scomplete_cb);
961 } else {
962 sreq = ucp_tag_send_nb(sender().ep(), &send_buf[0], size,
963 ucp_dt_make_contig(1), 0, scomplete_cb);
964 }
965
966 if (!is_exp) {
967 short_progress_loop();
968 rreq = ucp_tag_recv_nb(receiver().worker(), &recv_buf[0], size,
969 ucp_dt_make_contig(1), 0, 0, rtag_complete_cb);
970 }
971
972 wait(sreq);
973 wait(rreq);
974
975 compare_buffers(send_buf, recv_buf);
976 }
977
wait_for_server_ep()978 void wait_for_server_ep()
979 {
980 if (!test_ucp_sockaddr::wait_for_server_ep(false)) {
981 UCS_TEST_ABORT("server endpoint is not created");
982 }
983 }
984
test_stream_send_recv(size_t size,bool is_exp)985 void test_stream_send_recv(size_t size, bool is_exp)
986 {
987 std::string send_buf(size, 'x');
988 std::string recv_buf(size, 'y');
989 size_t recv_length;
990 void *rreq, *sreq;
991
992 if (is_exp) {
993 wait_for_server_ep();
994 rreq = ucp_stream_recv_nb(receiver().ep(), &recv_buf[0], size,
995 ucp_dt_make_contig(1), rstream_complete_cb,
996 &recv_length, UCP_STREAM_RECV_FLAG_WAITALL);
997 sreq = ucp_stream_send_nb(sender().ep(), &send_buf[0], size,
998 ucp_dt_make_contig(1), scomplete_cb, 0);
999 } else {
1000 sreq = ucp_stream_send_nb(sender().ep(), &send_buf[0], size,
1001 ucp_dt_make_contig(1), scomplete_cb, 0);
1002 short_progress_loop();
1003 wait_for_server_ep();
1004 rreq = ucp_stream_recv_nb(receiver().ep(), &recv_buf[0], size,
1005 ucp_dt_make_contig(1), rstream_complete_cb,
1006 &recv_length, UCP_STREAM_RECV_FLAG_WAITALL);
1007 }
1008
1009 wait(sreq);
1010 wait(rreq);
1011
1012 compare_buffers(send_buf, recv_buf);
1013 }
1014
register_mem(entity * initiator,entity * target,void * buffer,size_t length,ucp_mem_h * memh_p,ucp_rkey_h * rkey_p)1015 void register_mem(entity* initiator, entity* target, void *buffer,
1016 size_t length, ucp_mem_h *memh_p, ucp_rkey_h *rkey_p)
1017 {
1018 ucp_mem_map_params_t params = {0};
1019 params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
1020 UCP_MEM_MAP_PARAM_FIELD_LENGTH;
1021 params.address = buffer;
1022 params.length = length;
1023
1024 ucs_status_t status = ucp_mem_map(target->ucph(), ¶ms, memh_p);
1025 ASSERT_UCS_OK(status);
1026
1027 void *rkey_buffer;
1028 size_t rkey_buffer_size;
1029 status = ucp_rkey_pack(target->ucph(), *memh_p, &rkey_buffer,
1030 &rkey_buffer_size);
1031 ASSERT_UCS_OK(status);
1032
1033 status = ucp_ep_rkey_unpack(initiator->ep(), rkey_buffer, rkey_p);
1034 ASSERT_UCS_OK(status);
1035
1036 ucp_rkey_buffer_release(rkey_buffer);
1037 }
1038
test_rma(size_t size,rma_nb_func_t rma_func)1039 void test_rma(size_t size, rma_nb_func_t rma_func)
1040 {
1041 std::string send_buf(size, 'x');
1042 std::string recv_buf(size, 'y');
1043
1044 ucp_mem_h memh;
1045 ucp_rkey_h rkey;
1046
1047 register_mem(&sender(), &receiver(), &recv_buf[0], size, &memh, &rkey);
1048
1049 std::vector<void*> reqs;
1050 (this->*rma_func)(send_buf, recv_buf, rkey, reqs);
1051
1052 while (!reqs.empty()) {
1053 wait(reqs.back());
1054 reqs.pop_back();
1055 }
1056
1057 compare_buffers(send_buf, recv_buf);
1058
1059 ucp_rkey_destroy(rkey);
1060 ucs_status_t status = ucp_mem_unmap(receiver().ucph(), memh);
1061 ASSERT_UCS_OK(status);
1062 }
1063
test_am_send_recv(size_t size)1064 void test_am_send_recv(size_t size)
1065 {
1066 std::string sb(size, 'x');
1067
1068 bool am_received = false;
1069 ucp_worker_set_am_handler(receiver().worker(), 0,
1070 rx_am_msg_cb, &am_received, 0);
1071
1072 ucs_status_ptr_t sreq = ucp_am_send_nb(sender().ep(), 0, &sb[0], size,
1073 ucp_dt_make_contig(1),
1074 scomplete_cb, 0);
1075 wait(sreq);
1076 wait_for_flag(&am_received);
1077 EXPECT_TRUE(am_received);
1078
1079 ucp_worker_set_am_handler(receiver().worker(), 0, NULL, NULL, 0);
1080 }
1081
1082 private:
rx_am_msg_cb(void * arg,void * data,size_t length,ucp_ep_h reply_ep,unsigned flags)1083 static ucs_status_t rx_am_msg_cb(void *arg, void *data, size_t length,
1084 ucp_ep_h reply_ep, unsigned flags) {
1085 volatile bool *am_rx = reinterpret_cast<volatile bool*>(arg);
1086 EXPECT_FALSE(*am_rx);
1087 *am_rx = true;
1088 return UCS_OK;
1089 }
1090 };
1091
1092 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_exp,
1093 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1094 {
1095 test_tag_send_recv(4 * UCS_KBYTE, true);
1096 }
1097
1098 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_exp,
1099 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1100 {
1101 test_tag_send_recv(64 * UCS_KBYTE, true);
1102 }
1103
1104 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_exp_sync,
1105 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1106 {
1107 test_tag_send_recv(4 * UCS_KBYTE, true, true);
1108 }
1109
1110 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_exp_sync,
1111 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1112 {
1113 test_tag_send_recv(64 * UCS_KBYTE, true, true);
1114 }
1115
1116 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_rndv_exp, "RNDV_THRESH=10k")
1117 {
1118 test_tag_send_recv(64 * UCS_KBYTE, true);
1119 }
1120
1121 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_unexp,
1122 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1123 {
1124 test_tag_send_recv(4 * UCS_KBYTE, false);
1125 }
1126
1127 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_unexp,
1128 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1129 {
1130 test_tag_send_recv(64 * UCS_KBYTE, false);
1131 }
1132
1133 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_unexp_sync,
1134 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1135 {
1136 test_tag_send_recv(4 * UCS_KBYTE, false, true);
1137 }
1138
1139 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_unexp_sync,
1140 "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1141 {
1142 test_tag_send_recv(64 * UCS_KBYTE, false, true);
1143 }
1144
1145 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_rndv_unexp, "RNDV_THRESH=10k")
1146 {
1147 test_tag_send_recv(64 * UCS_KBYTE, false);
1148 }
1149
1150 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_4k_exp, "ZCOPY_THRESH=inf")
1151 {
1152 test_stream_send_recv(4 * UCS_KBYTE, true);
1153 }
1154
1155 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_4k_unexp,
1156 "ZCOPY_THRESH=inf")
1157 {
1158 test_stream_send_recv(4 * UCS_KBYTE, false);
1159 }
1160
1161 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_64k_exp, "ZCOPY_THRESH=inf")
1162 {
1163 test_stream_send_recv(64 * UCS_KBYTE, true);
1164 }
1165
1166 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_64k_unexp,
1167 "ZCOPY_THRESH=inf")
1168 {
1169 test_stream_send_recv(64 * UCS_KBYTE, false);
1170 }
1171
1172 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_zcopy_64k_exp, "ZCOPY_THRESH=2k")
1173 {
1174 test_stream_send_recv(64 * UCS_KBYTE, true);
1175 }
1176
1177 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_zcopy_64k_unexp,
1178 "ZCOPY_THRESH=2k")
1179 {
1180 test_stream_send_recv(64 * UCS_KBYTE, false);
1181 }
1182
UCS_TEST_P(test_ucp_sockaddr_protocols,get_bcopy_small)1183 UCS_TEST_P(test_ucp_sockaddr_protocols, get_bcopy_small)
1184 {
1185 test_rma(8, &test_ucp_sockaddr_protocols::get_nb);
1186 }
1187
1188 UCS_TEST_P(test_ucp_sockaddr_protocols, get_bcopy, "ZCOPY_THRESH=inf")
1189 {
1190 test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::get_nb);
1191 }
1192
1193 UCS_TEST_P(test_ucp_sockaddr_protocols, get_zcopy, "ZCOPY_THRESH=10k")
1194 {
1195 test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::get_nb);
1196 }
1197
UCS_TEST_P(test_ucp_sockaddr_protocols,put_bcopy_small)1198 UCS_TEST_P(test_ucp_sockaddr_protocols, put_bcopy_small)
1199 {
1200 test_rma(8, &test_ucp_sockaddr_protocols::put_nb);
1201 }
1202
1203 UCS_TEST_P(test_ucp_sockaddr_protocols, put_bcopy, "ZCOPY_THRESH=inf")
1204 {
1205 test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::put_nb);
1206 }
1207
1208 UCS_TEST_P(test_ucp_sockaddr_protocols, put_zcopy, "ZCOPY_THRESH=10k")
1209 {
1210 test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::put_nb);
1211 }
1212
UCS_TEST_P(test_ucp_sockaddr_protocols,am_short)1213 UCS_TEST_P(test_ucp_sockaddr_protocols, am_short)
1214 {
1215 test_am_send_recv(1);
1216 }
1217
1218 UCS_TEST_P(test_ucp_sockaddr_protocols, am_bcopy_1k, "ZCOPY_THRESH=inf")
1219 {
1220 test_am_send_recv(1 * UCS_KBYTE);
1221 }
1222
1223 UCS_TEST_P(test_ucp_sockaddr_protocols, am_bcopy_64k, "ZCOPY_THRESH=inf")
1224 {
1225 test_am_send_recv(64 * UCS_KBYTE);
1226 }
1227
1228 UCS_TEST_P(test_ucp_sockaddr_protocols, am_zcopy_1k, "ZCOPY_THRESH=512")
1229 {
1230 test_am_send_recv(1 * UCS_KBYTE);
1231 }
1232
1233 UCS_TEST_P(test_ucp_sockaddr_protocols, am_zcopy_64k, "ZCOPY_THRESH=512")
1234 {
1235 test_am_send_recv(64 * UCS_KBYTE);
1236 }
1237
1238
1239 /* Only IB transports support CM for now
1240 * For DC case, allow fallback to UD if DC is not supported
1241 */
1242 #define UCP_INSTANTIATE_CM_TEST_CASE(_test_case) \
1243 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, dcudx, "dc_x,ud") \
1244 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, ud, "ud_v") \
1245 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, udx, "ud_x") \
1246 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, rc, "rc_v") \
1247 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, rcx, "rc_x") \
1248 UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, ib, "ib")
1249
1250 UCP_INSTANTIATE_CM_TEST_CASE(test_ucp_sockaddr_protocols)
1251