1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2017.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #include "ucp_test.h"
8 #include "common/test.h"
9 #include "ucp/ucp_test.h"
10 
11 #include <common/test_helpers.h>
12 #include <ucs/sys/sys.h>
13 #include <ifaddrs.h>
14 #include <sys/poll.h>
15 
16 extern "C" {
17 #include <ucp/core/ucp_listener.h>
18 }
19 
20 #define UCP_INSTANTIATE_ALL_TEST_CASE(_test_case) \
21         UCP_INSTANTIATE_TEST_CASE (_test_case) \
22         UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, shm, "shm") \
23         UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, dc_ud, "dc_x,ud_v,ud_x,mm") \
24         UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, no_ud_ud_x, "dc_x,mm") \
25         /* dc_ud case is for testing handling of a large worker address on
26          * UCT_IFACE_FLAG_CONNECT_TO_IFACE transports (dc_x) */
27         /* no_ud_ud_x case is for testing handling a large worker address
28          * but with the lack of ud/ud_x transports, which would return an error
29          * and skipped */
30 
31 class test_ucp_sockaddr : public ucp_test {
32 public:
get_ctx_params()33     static ucp_params_t get_ctx_params() {
34         ucp_params_t params = ucp_test::get_ctx_params();
35         params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
36         params.features     = UCP_FEATURE_TAG | UCP_FEATURE_STREAM;
37         return params;
38     }
39 
40     enum {
41         CONN_REQ_TAG = DEFAULT_PARAM_VARIANT + 1,     /* Accepting by ucp_conn_request_h,
42                                                          send/recv by TAG API */
43         CONN_REQ_STREAM                               /* Accepting by ucp_conn_request_h,
44                                                          send/recv by STREAM API */
45     };
46 
47     enum {
48         TEST_MODIFIER_MASK      = UCS_MASK(16),
49         TEST_MODIFIER_MT        = UCS_BIT(16),
50         TEST_MODIFIER_CM        = UCS_BIT(17)
51     };
52 
53     enum {
54         SEND_DIRECTION_C2S  = UCS_BIT(0), /* send data from client to server */
55         SEND_DIRECTION_S2C  = UCS_BIT(1), /* send data from server to client */
56         SEND_DIRECTION_BIDI = SEND_DIRECTION_C2S | SEND_DIRECTION_S2C /* bidirectional send */
57     };
58 
59     typedef enum {
60         SEND_RECV_TAG,
61         SEND_RECV_STREAM
62     } send_recv_type_t;
63 
64     ucs::sock_addr_storage m_test_addr;
65 
init()66     void init() {
67         if (GetParam().variant & TEST_MODIFIER_CM) {
68             modify_config("SOCKADDR_CM_ENABLE", "yes");
69         }
70         get_sockaddr();
71         ucp_test::init();
72         skip_loopback();
73     }
74 
75     static void
enum_test_params_with_modifier(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls,std::vector<ucp_test_param> & result,unsigned modifier)76     enum_test_params_with_modifier(const ucp_params_t& ctx_params,
77                                       const std::string& name,
78                                       const std::string& test_case_name,
79                                       const std::string& tls,
80                                       std::vector<ucp_test_param> &result,
81                                       unsigned modifier)
82     {
83         generate_test_params_variant(ctx_params, name, test_case_name, tls,
84                                      modifier, result, SINGLE_THREAD);
85         generate_test_params_variant(ctx_params, name, test_case_name, tls,
86                                      modifier | TEST_MODIFIER_MT, result,
87                                      MULTI_THREAD_WORKER);
88     }
89 
90     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)91     enum_test_params(const ucp_params_t& ctx_params,
92                      const std::string& name,
93                      const std::string& test_case_name,
94                      const std::string& tls)
95     {
96         std::vector<ucp_test_param> result =
97             ucp_test::enum_test_params(ctx_params, name, test_case_name, tls);
98 
99         enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
100                                        result, CONN_REQ_TAG);
101         enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
102                                        result, CONN_REQ_TAG | TEST_MODIFIER_CM);
103         enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
104                                        result, CONN_REQ_STREAM);
105         enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
106                                        result, CONN_REQ_STREAM | TEST_MODIFIER_CM);
107         return result;
108     }
109 
110     static ucs_log_func_rc_t
detect_error_logger(const char * file,unsigned line,const char * function,ucs_log_level_t level,const ucs_log_component_config_t * comp_conf,const char * message,va_list ap)111     detect_error_logger(const char *file, unsigned line, const char *function,
112                         ucs_log_level_t level,
113                         const ucs_log_component_config_t *comp_conf,
114                         const char *message, va_list ap)
115     {
116         if (level == UCS_LOG_LEVEL_ERROR) {
117             static std::vector<std::string> stop_list;
118             if (stop_list.empty()) {
119                 stop_list.push_back("no supported sockaddr auxiliary transports found for");
120                 stop_list.push_back("sockaddr aux resources addresses");
121                 stop_list.push_back("no peer failure handler");
122                 stop_list.push_back("connection request failed on listener");
123                 /* when the "peer failure" error happens, it is followed by: */
124                 stop_list.push_back("received event RDMA_CM_EVENT_UNREACHABLE");
125                 stop_list.push_back(ucs_status_string(UCS_ERR_UNREACHABLE));
126                 stop_list.push_back(ucs_status_string(UCS_ERR_UNSUPPORTED));
127             }
128 
129             std::string err_str = format_message(message, ap);
130             for (size_t i = 0; i < stop_list.size(); ++i) {
131                 if (err_str.find(stop_list[i]) != std::string::npos) {
132                     UCS_TEST_MESSAGE << err_str;
133                     return UCS_LOG_FUNC_RC_STOP;
134                 }
135             }
136         }
137         return UCS_LOG_FUNC_RC_CONTINUE;
138     }
139 
get_sockaddr()140     void get_sockaddr() {
141         std::vector<ucs::sock_addr_storage> saddrs;
142         struct ifaddrs* ifaddrs;
143         ucs_status_t status;
144         size_t size;
145         int ret = getifaddrs(&ifaddrs);
146         ASSERT_EQ(ret, 0);
147 
148         for (struct ifaddrs *ifa = ifaddrs; ifa != NULL; ifa = ifa->ifa_next) {
149             if (ucs_netif_flags_is_active(ifa->ifa_flags) &&
150                 ucs::is_inet_addr(ifa->ifa_addr) &&
151                 ucs::is_rdmacm_netdev(ifa->ifa_name))
152             {
153                 saddrs.push_back(ucs::sock_addr_storage());
154                 status = ucs_sockaddr_sizeof(ifa->ifa_addr, &size);
155                 ASSERT_UCS_OK(status);
156                 saddrs.back().set_sock_addr(*ifa->ifa_addr, size);
157                 saddrs.back().set_port(0); /* listen on any port then update */
158             }
159         }
160 
161         freeifaddrs(ifaddrs);
162 
163         if (saddrs.empty()) {
164             UCS_TEST_SKIP_R("No interface for testing");
165         }
166 
167         static const std::string dc_tls[] = { "dc", "dc_x", "ib" };
168 
169         bool has_dc = has_any_transport(
170             std::vector<std::string>(dc_tls, dc_tls + ucs_array_size(dc_tls)));
171 
172         /* FIXME: select random interface, except for DC transport, which do not
173                   yet support having different gid_index for different UCT
174                   endpoints on same iface */
175         int saddr_idx = has_dc ? 0 : (ucs::rand() % saddrs.size());
176         m_test_addr   = saddrs[saddr_idx];
177     }
178 
start_listener(ucp_test_base::entity::listen_cb_type_t cb_type)179     void start_listener(ucp_test_base::entity::listen_cb_type_t cb_type)
180     {
181         ucs_time_t deadline = ucs::get_deadline();
182         ucs_status_t status;
183 
184         do {
185             status = receiver().listen(cb_type, m_test_addr.get_sock_addr_ptr(),
186                                        m_test_addr.get_addr_size(),
187                                        get_server_ep_params());
188         } while ((status == UCS_ERR_BUSY) && (ucs_get_time() < deadline));
189 
190         if (status == UCS_ERR_UNREACHABLE) {
191             UCS_TEST_SKIP_R("cannot listen to " + m_test_addr.to_str());
192         }
193 
194         ASSERT_UCS_OK(status);
195         ucp_listener_attr_t attr;
196         uint16_t            port;
197 
198         attr.field_mask = UCP_LISTENER_ATTR_FIELD_SOCKADDR;
199         ASSERT_UCS_OK(ucp_listener_query(receiver().listenerh(), &attr));
200         ASSERT_UCS_OK(ucs_sockaddr_get_port(
201                         (const struct sockaddr *)&attr.sockaddr, &port));
202         m_test_addr.set_port(port);
203         UCS_TEST_MESSAGE << "server listening on " << m_test_addr.to_str();
204     }
205 
scomplete_cb(void * req,ucs_status_t status)206     static void scomplete_cb(void *req, ucs_status_t status)
207     {
208         if ((status == UCS_OK)              ||
209             (status == UCS_ERR_UNREACHABLE) ||
210             (status == UCS_ERR_REJECTED)) {
211             return;
212         }
213         UCS_TEST_ABORT("Error: " << ucs_status_string(status));
214     }
215 
rtag_complete_cb(void * req,ucs_status_t status,ucp_tag_recv_info_t * info)216     static void rtag_complete_cb(void *req, ucs_status_t status,
217                                  ucp_tag_recv_info_t *info)
218     {
219         EXPECT_UCS_OK(status);
220     }
221 
rstream_complete_cb(void * req,ucs_status_t status,size_t length)222     static void rstream_complete_cb(void *req, ucs_status_t status,
223                                     size_t length)
224     {
225         EXPECT_UCS_OK(status);
226     }
227 
wait_for_wakeup(ucp_worker_h send_worker,ucp_worker_h recv_worker)228     static void wait_for_wakeup(ucp_worker_h send_worker, ucp_worker_h recv_worker)
229     {
230         int ret, send_efd, recv_efd;
231         ucs_status_t status;
232 
233         ASSERT_UCS_OK(ucp_worker_get_efd(send_worker, &send_efd));
234         ASSERT_UCS_OK(ucp_worker_get_efd(recv_worker, &recv_efd));
235 
236         status = ucp_worker_arm(recv_worker);
237         if (status == UCS_ERR_BUSY) {
238             return;
239         }
240         ASSERT_UCS_OK(status);
241 
242         status = ucp_worker_arm(send_worker);
243         if (status == UCS_ERR_BUSY) {
244             return;
245         }
246         ASSERT_UCS_OK(status);
247 
248         do {
249             struct pollfd pfd[2];
250             pfd[0].fd     = send_efd;
251             pfd[1].fd     = recv_efd;
252             pfd[0].events = POLLIN;
253             pfd[1].events = POLLIN;
254             ret = poll(pfd, 2, -1);
255         } while ((ret < 0) && (errno == EINTR));
256         if (ret < 0) {
257             UCS_TEST_MESSAGE << "poll() failed: " << strerror(errno);
258         }
259 
260         EXPECT_GE(ret, 1);
261     }
262 
check_events(ucp_worker_h send_worker,ucp_worker_h recv_worker,bool wakeup,void * req)263     void check_events(ucp_worker_h send_worker, ucp_worker_h recv_worker,
264                       bool wakeup, void *req)
265     {
266         if (progress()) {
267             return;
268         }
269 
270         if ((req != NULL) && (ucp_request_check_status(req) == UCS_ERR_UNREACHABLE)) {
271             return;
272         }
273 
274         if (wakeup) {
275             wait_for_wakeup(send_worker, recv_worker);
276         }
277     }
278 
send_recv(entity & from,entity & to,send_recv_type_t send_recv_type,bool wakeup,ucp_test_base::entity::listen_cb_type_t cb_type)279     void send_recv(entity& from, entity& to, send_recv_type_t send_recv_type,
280                    bool wakeup, ucp_test_base::entity::listen_cb_type_t cb_type)
281     {
282         const uint64_t send_data = ucs_generate_uuid(0);
283         void *send_req = NULL;
284         if (send_recv_type == SEND_RECV_TAG) {
285             send_req = ucp_tag_send_nb(from.ep(), &send_data, 1,
286                                        ucp_dt_make_contig(sizeof(send_data)), 1,
287                                        scomplete_cb);
288         } else if (send_recv_type == SEND_RECV_STREAM) {
289             send_req = ucp_stream_send_nb(from.ep(), &send_data, 1,
290                                           ucp_dt_make_contig(sizeof(send_data)),
291                                           scomplete_cb, 0);
292         } else {
293             ASSERT_TRUE(false) << "unsupported communication type";
294         }
295 
296         ucs_status_t send_status;
297         if (send_req == NULL) {
298             send_status = UCS_OK;
299         } else if (UCS_PTR_IS_ERR(send_req)) {
300             send_status = UCS_PTR_STATUS(send_req);
301             ASSERT_UCS_OK(send_status);
302         } else {
303             while (!ucp_request_is_completed(send_req)) {
304                 check_events(from.worker(), to.worker(), wakeup, send_req);
305             }
306             send_status = ucp_request_check_status(send_req);
307             ucp_request_free(send_req);
308         }
309 
310         if (send_status == UCS_ERR_UNREACHABLE) {
311             /* Check if the error was completed due to the error handling flow.
312              * If so, skip the test since a valid error occurred - the one expected
313              * from the error handling flow - cases of failure to handle long worker
314              * address or transport doesn't support the error handling requirement */
315             UCS_TEST_SKIP_R("Skipping due an unreachable destination (unsupported "
316                             "feature or too long worker address or no "
317                             "supported transport to send partial worker "
318                             "address)");
319         } else if ((send_status == UCS_ERR_REJECTED) &&
320                    (cb_type == ucp_test_base::entity::LISTEN_CB_REJECT)) {
321             return;
322         } else {
323             ASSERT_UCS_OK(send_status);
324         }
325 
326         uint64_t recv_data = 0;
327         void *recv_req;
328         if (send_recv_type == SEND_RECV_TAG) {
329             recv_req = ucp_tag_recv_nb(to.worker(), &recv_data, 1,
330                                        ucp_dt_make_contig(sizeof(recv_data)),
331                                        1, 0, rtag_complete_cb);
332         } else {
333             ASSERT_TRUE(send_recv_type == SEND_RECV_STREAM);
334             ucp_stream_poll_ep_t poll_eps;
335             ssize_t              ep_count;
336             size_t               recv_length;
337             do {
338                 progress();
339                 ep_count = ucp_stream_worker_poll(to.worker(), &poll_eps, 1, 0);
340             } while (ep_count == 0);
341             ASSERT_EQ(1,       ep_count);
342             EXPECT_EQ(to.ep(), poll_eps.ep);
343             EXPECT_EQ(&to,     poll_eps.user_data);
344 
345             recv_req = ucp_stream_recv_nb(to.ep(), &recv_data, 1,
346                                           ucp_dt_make_contig(sizeof(recv_data)),
347                                           rstream_complete_cb, &recv_length,
348                                           UCP_STREAM_RECV_FLAG_WAITALL);
349         }
350 
351         if (recv_req != NULL) {
352             ASSERT_TRUE(UCS_PTR_IS_PTR(recv_req));
353             while (!ucp_request_is_completed(recv_req)) {
354                 check_events(from.worker(), to.worker(), wakeup, recv_req);
355             }
356             ucp_request_free(recv_req);
357         }
358 
359         EXPECT_EQ(send_data, recv_data);
360     }
361 
wait_for_server_ep(bool wakeup)362     bool wait_for_server_ep(bool wakeup)
363     {
364         ucs_time_t deadline = ucs::get_deadline();
365 
366         while ((receiver().get_num_eps() == 0) &&
367                (sender().get_err_num() == 0) && (ucs_get_time() < deadline)) {
368             check_events(sender().worker(), receiver().worker(), wakeup, NULL);
369         }
370 
371         return (sender().get_err_num() == 0) && (receiver().get_num_eps() > 0);
372     }
373 
wait_for_reject(entity & e,bool wakeup)374     void wait_for_reject(entity &e, bool wakeup)
375     {
376         ucs_time_t deadline = ucs::get_deadline();
377 
378         while ((e.get_err_num_rejected() == 0) && (ucs_get_time() < deadline)) {
379             check_events(sender().worker(), receiver().worker(), wakeup, NULL);
380         }
381 
382         EXPECT_GT(deadline, ucs_get_time());
383         EXPECT_EQ(1ul, e.get_err_num_rejected());
384     }
385 
get_ep_params()386     virtual ucp_ep_params_t get_ep_params()
387     {
388         ucp_ep_params_t ep_params = ucp_test::get_ep_params();
389         ep_params.field_mask      |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
390                                      UCP_EP_PARAM_FIELD_ERR_HANDLER;
391         /* The error handling requirement is needed since we need to take
392          * care of a case where the client gets an error. In case ucp needs to
393          * handle a large worker address but neither ud nor ud_x are present */
394         ep_params.err_mode         = UCP_ERR_HANDLING_MODE_PEER;
395         ep_params.err_handler.cb   = err_handler_cb;
396         ep_params.err_handler.arg  = NULL;
397         return ep_params;
398     }
399 
get_server_ep_params()400     virtual ucp_ep_params_t get_server_ep_params() {
401         return get_ep_params();
402     }
403 
client_ep_connect()404     void client_ep_connect()
405     {
406         ucp_ep_params_t ep_params = get_ep_params();
407         ep_params.field_mask      |= UCP_EP_PARAM_FIELD_FLAGS |
408                                      UCP_EP_PARAM_FIELD_SOCK_ADDR |
409                                      UCP_EP_PARAM_FIELD_USER_DATA;
410         ep_params.flags            = UCP_EP_PARAMS_FLAGS_CLIENT_SERVER;
411         ep_params.sockaddr.addr    = m_test_addr.get_sock_addr_ptr();
412         ep_params.sockaddr.addrlen = m_test_addr.get_addr_size();
413         ep_params.user_data        = &sender();
414         sender().connect(&receiver(), ep_params);
415     }
416 
connect_and_send_recv(bool wakeup,uint64_t flags)417     void connect_and_send_recv(bool wakeup, uint64_t flags)
418     {
419         {
420             scoped_log_handler slh(detect_error_logger);
421             client_ep_connect();
422             if (!wait_for_server_ep(wakeup)) {
423                 UCS_TEST_SKIP_R("cannot connect to server");
424             }
425         }
426 
427         if (flags & SEND_DIRECTION_C2S) {
428             send_recv(sender(), receiver(), send_recv_type(), wakeup,
429                       cb_type());
430         }
431 
432         if (flags & SEND_DIRECTION_S2C) {
433             send_recv(receiver(), sender(), send_recv_type(), wakeup,
434                       cb_type());
435         }
436     }
437 
connect_and_reject(bool wakeup)438     void connect_and_reject(bool wakeup)
439     {
440         {
441             scoped_log_handler slh(detect_error_logger);
442             client_ep_connect();
443             /* Check reachability with tagged send */
444             send_recv(sender(), receiver(), SEND_RECV_TAG, wakeup,
445                       ucp_test_base::entity::LISTEN_CB_REJECT);
446         }
447         wait_for_reject(receiver(), wakeup);
448         wait_for_reject(sender(),   wakeup);
449     }
450 
listen_and_communicate(bool wakeup,uint64_t flags)451     void listen_and_communicate(bool wakeup, uint64_t flags)
452     {
453         UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
454 
455         start_listener(cb_type());
456         connect_and_send_recv(wakeup, flags);
457     }
458 
listen_and_reject(bool wakeup)459     void listen_and_reject(bool wakeup)
460     {
461         UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
462 
463         start_listener(ucp_test_base::entity::LISTEN_CB_REJECT);
464         connect_and_reject(wakeup);
465     }
466 
one_sided_disconnect(entity & e,enum ucp_ep_close_mode mode)467     void one_sided_disconnect(entity &e, enum ucp_ep_close_mode mode) {
468         void *req           = e.disconnect_nb(0, 0, mode);
469         ucs_time_t deadline = ucs_time_from_sec(10.0) + ucs_get_time();
470         while (!is_request_completed(req) && (ucs_get_time() < deadline)) {
471             /* TODO: replace the progress() with e().progress() when
472                      async progress is implemented. */
473             progress();
474         };
475 
476         e.close_ep_req_free(req);
477     }
478 
concurrent_disconnect(enum ucp_ep_close_mode mode)479     void concurrent_disconnect(enum ucp_ep_close_mode mode) {
480         ASSERT_EQ(2ul, entities().size());
481         ASSERT_EQ(1, sender().get_num_workers());
482         ASSERT_EQ(1, sender().get_num_eps());
483         ASSERT_EQ(1, receiver().get_num_workers());
484         ASSERT_EQ(1, receiver().get_num_eps());
485 
486         void *sender_ep_close_req   = sender().disconnect_nb(0, 0, mode);
487         void *receiver_ep_close_req = receiver().disconnect_nb(0, 0, mode);
488 
489         ucs_time_t deadline = ucs::get_deadline();
490         while ((!is_request_completed(sender_ep_close_req) ||
491                 !is_request_completed(receiver_ep_close_req)) &&
492                (ucs_get_time() < deadline)) {
493             progress();
494         }
495 
496         sender().close_ep_req_free(sender_ep_close_req);
497         receiver().close_ep_req_free(receiver_ep_close_req);
498     }
499 
err_handler_cb(void * arg,ucp_ep_h ep,ucs_status_t status)500     static void err_handler_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
501         ucp_test::err_handler_cb(arg, ep, status);
502 
503         /* The current expected errors are only from the err_handle test
504          * and from transports where the worker address is too long but ud/ud_x
505          * are not present, or ud/ud_x are present but their addresses are too
506          * long as well, in addition we can get disconnect events during test
507          * teardown.
508          */
509         switch (status) {
510         case UCS_ERR_REJECTED:
511         case UCS_ERR_UNREACHABLE:
512         case UCS_ERR_CONNECTION_RESET:
513             UCS_TEST_MESSAGE << "ignoring error " <<ucs_status_string(status)
514                              << " on endpoint " << ep;
515             return;
516         default:
517             UCS_TEST_ABORT("Error: " << ucs_status_string(status));
518         }
519     }
520 
521 protected:
cb_type() const522     ucp_test_base::entity::listen_cb_type_t cb_type() const {
523         const int variant = (GetParam().variant & TEST_MODIFIER_MASK);
524         if ((variant == CONN_REQ_TAG) || (variant == CONN_REQ_STREAM)) {
525             return ucp_test_base::entity::LISTEN_CB_CONN;
526         }
527         return ucp_test_base::entity::LISTEN_CB_EP;
528     }
529 
send_recv_type() const530     send_recv_type_t send_recv_type() const {
531         switch (GetParam().variant & TEST_MODIFIER_MASK) {
532         case CONN_REQ_STREAM:
533             return SEND_RECV_STREAM;
534         case CONN_REQ_TAG:
535             /* fallthrough */
536         default:
537             return SEND_RECV_TAG;
538         }
539     }
540 
nonparameterized_test() const541     bool nonparameterized_test() const {
542         return (GetParam().variant != DEFAULT_PARAM_VARIANT) &&
543                (GetParam().variant != (CONN_REQ_TAG | TEST_MODIFIER_CM));
544     }
545 
no_close_protocol() const546     bool no_close_protocol() const {
547         return !(GetParam().variant & TEST_MODIFIER_CM);
548     }
549 };
550 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,listen,no_close_protocol ())551 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listen, no_close_protocol()) {
552     listen_and_communicate(false, 0);
553 }
554 
UCS_TEST_P(test_ucp_sockaddr,listen_c2s)555 UCS_TEST_P(test_ucp_sockaddr, listen_c2s) {
556     listen_and_communicate(false, SEND_DIRECTION_C2S);
557 }
558 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,listen_s2c,no_close_protocol ())559 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listen_s2c, no_close_protocol()) {
560     listen_and_communicate(false, SEND_DIRECTION_S2C);
561 }
562 
UCS_TEST_P(test_ucp_sockaddr,listen_bidi)563 UCS_TEST_P(test_ucp_sockaddr, listen_bidi) {
564     listen_and_communicate(false, SEND_DIRECTION_BIDI);
565 }
566 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,onesided_disconnect,no_close_protocol ())567 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect,
568                      no_close_protocol()) {
569     listen_and_communicate(false, 0);
570     one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
571 }
572 
UCS_TEST_P(test_ucp_sockaddr,onesided_disconnect_c2s)573 UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_c2s) {
574     listen_and_communicate(false, SEND_DIRECTION_C2S);
575     one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
576 }
577 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,onesided_disconnect_s2c,no_close_protocol ())578 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, onesided_disconnect_s2c,
579                      no_close_protocol()) {
580     listen_and_communicate(false, SEND_DIRECTION_S2C);
581     one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
582 }
583 
UCS_TEST_P(test_ucp_sockaddr,onesided_disconnect_bidi)584 UCS_TEST_P(test_ucp_sockaddr, onesided_disconnect_bidi) {
585     listen_and_communicate(false, SEND_DIRECTION_BIDI);
586     one_sided_disconnect(sender(), UCP_EP_CLOSE_MODE_FLUSH);
587 }
588 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect,no_close_protocol ())589 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect,
590                      no_close_protocol()) {
591     listen_and_communicate(false, 0);
592     concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
593 }
594 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_c2s,no_close_protocol ())595 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_c2s,
596                      no_close_protocol()) {
597     listen_and_communicate(false, SEND_DIRECTION_C2S);
598     concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
599 }
600 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_s2c,no_close_protocol ())601 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_s2c,
602                      no_close_protocol()) {
603     listen_and_communicate(false, SEND_DIRECTION_S2C);
604     concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
605 }
606 
UCS_TEST_P(test_ucp_sockaddr,concurrent_disconnect_bidi)607 UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_bidi) {
608     listen_and_communicate(false, SEND_DIRECTION_BIDI);
609     concurrent_disconnect(UCP_EP_CLOSE_MODE_FLUSH);
610 }
611 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_force,no_close_protocol ())612 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force,
613                      no_close_protocol()) {
614     listen_and_communicate(false, 0);
615     concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
616 }
617 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_force_c2s,no_close_protocol ())618 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force_c2s,
619                      no_close_protocol()) {
620     listen_and_communicate(false, SEND_DIRECTION_C2S);
621     concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
622 }
623 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,concurrent_disconnect_force_s2c,no_close_protocol ())624 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, concurrent_disconnect_force_s2c,
625                      no_close_protocol()) {
626     listen_and_communicate(false, SEND_DIRECTION_S2C);
627     concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
628 }
629 
UCS_TEST_P(test_ucp_sockaddr,concurrent_disconnect_force_bidi)630 UCS_TEST_P(test_ucp_sockaddr, concurrent_disconnect_force_bidi) {
631     listen_and_communicate(false, SEND_DIRECTION_BIDI);
632     concurrent_disconnect(UCP_EP_CLOSE_MODE_FORCE);
633 }
634 
UCS_TEST_P(test_ucp_sockaddr,listen_inaddr_any)635 UCS_TEST_P(test_ucp_sockaddr, listen_inaddr_any) {
636     /* save testing address */
637     ucs::sock_addr_storage test_addr(m_test_addr);
638     m_test_addr.reset_to_any();
639 
640     UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
641 
642     start_listener(cb_type());
643     /* get the actual port which was selected by listener */
644     test_addr.set_port(m_test_addr.get_port());
645     /* restore address */
646     m_test_addr = test_addr;
647     connect_and_send_recv(false, SEND_DIRECTION_C2S);
648 }
649 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr,reject,nonparameterized_test ())650 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, reject, nonparameterized_test()) {
651     listen_and_reject(false);
652 }
653 
UCS_TEST_P(test_ucp_sockaddr,listener_query)654 UCS_TEST_P(test_ucp_sockaddr, listener_query) {
655     ucp_listener_attr_t listener_attr;
656     ucs_status_t status;
657 
658     listener_attr.field_mask = UCP_LISTENER_ATTR_FIELD_SOCKADDR;
659 
660     UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
661 
662     start_listener(cb_type());
663     status = ucp_listener_query(receiver().listenerh(), &listener_attr);
664     EXPECT_UCS_OK(status);
665 
666     EXPECT_EQ(m_test_addr, listener_attr.sockaddr);
667 }
668 
UCS_TEST_P(test_ucp_sockaddr,err_handle)669 UCS_TEST_P(test_ucp_sockaddr, err_handle) {
670 
671     ucs::sock_addr_storage listen_addr(m_test_addr.to_ucs_sock_addr());
672     ucs_status_t status = receiver().listen(cb_type(),
673                                             m_test_addr.get_sock_addr_ptr(),
674                                             m_test_addr.get_addr_size(),
675                                             get_server_ep_params());
676     if (status == UCS_ERR_UNREACHABLE) {
677         UCS_TEST_SKIP_R("cannot listen to " + m_test_addr.to_str());
678     }
679 
680     /* make the client try to connect to a non-existing port on the server side */
681     m_test_addr.set_port(1);
682 
683     {
684         scoped_log_handler slh(wrap_errors_logger);
685         client_ep_connect();
686         /* allow for the unreachable event to arrive before restoring errors */
687         wait_for_flag(&sender().get_err_num());
688     }
689 
690     EXPECT_EQ(1u, sender().get_err_num());
691 }
692 
693 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr)
694 
695 class test_ucp_sockaddr_destroy_ep_on_err : public test_ucp_sockaddr {
696 public:
test_ucp_sockaddr_destroy_ep_on_err()697     test_ucp_sockaddr_destroy_ep_on_err() {
698         /* Set small TL timeouts to reduce testing time */
699         m_env.push_back(new ucs::scoped_setenv("UCX_RC_TIMEOUT",     "10ms"));
700         m_env.push_back(new ucs::scoped_setenv("UCX_RC_RNR_TIMEOUT", "10ms"));
701         m_env.push_back(new ucs::scoped_setenv("UCX_RC_RETRY_COUNT", "2"));
702     }
703 
get_server_ep_params()704     virtual ucp_ep_params_t get_server_ep_params() {
705         ucp_ep_params_t params = test_ucp_sockaddr::get_server_ep_params();
706 
707         params.field_mask      |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
708                                  UCP_EP_PARAM_FIELD_ERR_HANDLER        |
709                                  UCP_EP_PARAM_FIELD_USER_DATA;
710         params.err_mode         = UCP_ERR_HANDLING_MODE_PEER;
711         params.err_handler.cb   = err_handler_cb;
712         params.err_handler.arg  = NULL;
713         params.user_data        = &receiver();
714         return params;
715     }
716 
err_handler_cb(void * arg,ucp_ep_h ep,ucs_status_t status)717     static void err_handler_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
718         test_ucp_sockaddr::err_handler_cb(arg, ep, status);
719         entity *e = reinterpret_cast<entity *>(arg);
720         e->disconnect_nb(0, 0, UCP_EP_CLOSE_MODE_FORCE);
721     }
722 
723 private:
724     ucs::ptr_vector<ucs::scoped_setenv> m_env;
725 };
726 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,empty,no_close_protocol ())727 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, empty,
728                      no_close_protocol()) {
729     listen_and_communicate(false, 0);
730 }
731 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,s2c,no_close_protocol ())732 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, s2c,
733                      no_close_protocol()) {
734     listen_and_communicate(false, SEND_DIRECTION_S2C);
735 }
736 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,c2s,no_close_protocol ())737 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, c2s,
738                      no_close_protocol()) {
739     listen_and_communicate(false, SEND_DIRECTION_C2S);
740 }
741 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,bidi,no_close_protocol ())742 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, bidi,
743                      no_close_protocol()) {
744     listen_and_communicate(false, SEND_DIRECTION_BIDI);
745 }
746 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_client_cforce,no_close_protocol ())747 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_cforce,
748                      no_close_protocol()) {
749     listen_and_communicate(false, 0);
750     scoped_log_handler slh(wrap_errors_logger);
751     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FORCE);
752     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
753 }
754 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_c2s_cforce,no_close_protocol ())755 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_cforce,
756                      no_close_protocol()) {
757     listen_and_communicate(false, SEND_DIRECTION_C2S);
758     scoped_log_handler slh(wrap_errors_logger);
759     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FORCE);
760     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
761 }
762 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_s2c_cforce,no_close_protocol ())763 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_cforce,
764                      no_close_protocol()) {
765     listen_and_communicate(false, SEND_DIRECTION_S2C);
766     scoped_log_handler slh(wrap_errors_logger);
767     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FORCE);
768     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
769 }
770 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_bidi_cforce,no_close_protocol ())771 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_cforce,
772                      no_close_protocol()) {
773     listen_and_communicate(false, SEND_DIRECTION_BIDI);
774     scoped_log_handler slh(wrap_errors_logger);
775     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FORCE);
776     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FLUSH);
777 }
778 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_client_sforce,no_close_protocol ())779 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_client_sforce,
780                      no_close_protocol()) {
781     listen_and_communicate(false, 0);
782     scoped_log_handler slh(wrap_errors_logger);
783     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
784     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FLUSH);
785 }
786 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_c2s_sforce,no_close_protocol ())787 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_c2s_sforce,
788                      no_close_protocol()) {
789     listen_and_communicate(false, SEND_DIRECTION_C2S);
790     scoped_log_handler slh(wrap_errors_logger);
791     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
792     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FLUSH);
793 }
794 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_s2c_sforce,no_close_protocol ())795 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_s2c_sforce,
796                      no_close_protocol()) {
797     listen_and_communicate(false, SEND_DIRECTION_S2C);
798     scoped_log_handler slh(wrap_errors_logger);
799     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
800     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FLUSH);
801 }
802 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err,onesided_bidi_sforce,no_close_protocol ())803 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_destroy_ep_on_err, onesided_bidi_sforce,
804                      no_close_protocol()) {
805     listen_and_communicate(false, SEND_DIRECTION_BIDI);
806     scoped_log_handler slh(wrap_errors_logger);
807     one_sided_disconnect(receiver(), UCP_EP_CLOSE_MODE_FORCE);
808     one_sided_disconnect(sender(),   UCP_EP_CLOSE_MODE_FLUSH);
809 }
810 
811 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr_destroy_ep_on_err)
812 
813 class test_ucp_sockaddr_with_wakeup : public test_ucp_sockaddr {
814 public:
get_ctx_params()815     static ucp_params_t get_ctx_params() {
816         ucp_params_t params = test_ucp_sockaddr::get_ctx_params();
817         params.features    |= UCP_FEATURE_WAKEUP;
818         return params;
819     }
820 };
821 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup,wakeup,no_close_protocol ())822 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, wakeup,
823                      no_close_protocol()) {
824     listen_and_communicate(true, 0);
825 }
826 
UCS_TEST_P(test_ucp_sockaddr_with_wakeup,wakeup_c2s)827 UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup_c2s) {
828     listen_and_communicate(true, SEND_DIRECTION_C2S);
829 }
830 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup,wakeup_s2c,no_close_protocol ())831 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, wakeup_s2c,
832                      no_close_protocol()) {
833     listen_and_communicate(true, SEND_DIRECTION_S2C);
834 }
835 
UCS_TEST_P(test_ucp_sockaddr_with_wakeup,wakeup_bidi)836 UCS_TEST_P(test_ucp_sockaddr_with_wakeup, wakeup_bidi) {
837     listen_and_communicate(true, SEND_DIRECTION_BIDI);
838 }
839 
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup,reject,nonparameterized_test ())840 UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_with_wakeup, reject,
841                      nonparameterized_test()) {
842     listen_and_reject(true);
843 }
844 
845 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr_with_wakeup)
846 
847 
848 class test_ucp_sockaddr_with_rma_atomic : public test_ucp_sockaddr {
849 public:
850 
get_ctx_params()851     static ucp_params_t get_ctx_params() {
852         ucp_params_t params = test_ucp_sockaddr::get_ctx_params();
853         params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
854         params.features    |= UCP_FEATURE_RMA   |
855                               UCP_FEATURE_AMO32 |
856                               UCP_FEATURE_AMO64;
857         return params;
858     }
859 };
860 
UCS_TEST_P(test_ucp_sockaddr_with_rma_atomic,wireup)861 UCS_TEST_P(test_ucp_sockaddr_with_rma_atomic, wireup) {
862 
863     /* This test makes sure that the client-server flow works when the required
864      * features are RMA/ATOMIC. With these features, need to make sure that
865      * there is a lane for ucp-wireup (an am_lane should be created and used) */
866     UCS_TEST_MESSAGE << "Testing " << m_test_addr.to_str();
867 
868     start_listener(cb_type());
869     {
870         scoped_log_handler slh(wrap_errors_logger);
871 
872         client_ep_connect();
873 
874         /* allow the err_handler callback to be invoked if needed */
875         if (!wait_for_server_ep(false)) {
876             EXPECT_EQ(1ul, sender().get_err_num());
877             UCS_TEST_SKIP_R("cannot connect to server");
878         }
879 
880         EXPECT_EQ(0ul, sender().get_err_num());
881         /* even if server EP is created, in case of long address, wireup will be
882          * done later, need to communicate */
883         send_recv(sender(), receiver(), send_recv_type(), false, cb_type());
884     }
885 }
886 
887 UCP_INSTANTIATE_ALL_TEST_CASE(test_ucp_sockaddr_with_rma_atomic)
888 
889 
890 class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
891 public:
get_ctx_params()892     static ucp_params_t get_ctx_params() {
893         ucp_params_t params = test_ucp_sockaddr::get_ctx_params();
894         params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
895         params.features    |= UCP_FEATURE_RMA | UCP_FEATURE_AM;
896         /* Atomics not supported for now because need to emulate the case
897          * of using different device than the one selected by default on the
898          * worker for atomic operations */
899         return params;
900     }
901 
902     static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)903     enum_test_params(const ucp_params_t& ctx_params,
904                      const std::string& name,
905                      const std::string& test_case_name,
906                      const std::string& tls)
907     {
908         std::vector<ucp_test_param> result;
909         enum_test_params_with_modifier(ctx_params, name, test_case_name, tls,
910                                        result, TEST_MODIFIER_CM);
911         return result;
912     }
913 
init()914     virtual void init() {
915         test_ucp_sockaddr::init();
916         start_listener(cb_type());
917         client_ep_connect();
918     }
919 
get_nb(std::string & send_buf,std::string & recv_buf,ucp_rkey_h rkey,std::vector<void * > & reqs)920     void get_nb(std::string& send_buf, std::string& recv_buf, ucp_rkey_h rkey,
921                 std::vector<void*>& reqs)
922     {
923          reqs.push_back(ucp_get_nb(sender().ep(), &send_buf[0], send_buf.size(),
924                                    (uintptr_t)&recv_buf[0], rkey, scomplete_cb));
925     }
926 
put_nb(std::string & send_buf,std::string & recv_buf,ucp_rkey_h rkey,std::vector<void * > & reqs)927     void put_nb(std::string& send_buf, std::string& recv_buf, ucp_rkey_h rkey,
928                 std::vector<void*>& reqs)
929     {
930         reqs.push_back(ucp_put_nb(sender().ep(), &send_buf[0], send_buf.size(),
931                                   (uintptr_t)&recv_buf[0], rkey, scomplete_cb));
932         reqs.push_back(ucp_ep_flush_nb(sender().ep(), 0, scomplete_cb));
933     }
934 
935 protected:
936     typedef void (test_ucp_sockaddr_protocols::*rma_nb_func_t)(
937                     std::string&, std::string&, ucp_rkey_h, std::vector<void*>&);
938 
compare_buffers(std::string & send_buf,std::string & recv_buf)939     void compare_buffers(std::string& send_buf, std::string& recv_buf)
940     {
941         EXPECT_TRUE(send_buf == recv_buf)
942             << "send_buf: '" << ucs::compact_string(send_buf, 20) << "', "
943             << "recv_buf: '" << ucs::compact_string(recv_buf, 20) << "'";
944     }
945 
test_tag_send_recv(size_t size,bool is_exp,bool is_sync=false)946     void test_tag_send_recv(size_t size, bool is_exp, bool is_sync = false)
947     {
948         std::string send_buf(size, 'x');
949         std::string recv_buf(size, 'y');
950 
951         void *rreq = NULL, *sreq = NULL;
952 
953         if (is_exp) {
954             rreq = ucp_tag_recv_nb(receiver().worker(), &recv_buf[0], size,
955                                    ucp_dt_make_contig(1), 0, 0, rtag_complete_cb);
956         }
957 
958         if (is_sync) {
959             sreq = ucp_tag_send_sync_nb(sender().ep(), &send_buf[0], size,
960                                         ucp_dt_make_contig(1), 0, scomplete_cb);
961         } else {
962             sreq = ucp_tag_send_nb(sender().ep(), &send_buf[0], size,
963                                    ucp_dt_make_contig(1), 0, scomplete_cb);
964         }
965 
966         if (!is_exp) {
967             short_progress_loop();
968             rreq = ucp_tag_recv_nb(receiver().worker(), &recv_buf[0], size,
969                                    ucp_dt_make_contig(1), 0, 0, rtag_complete_cb);
970         }
971 
972         wait(sreq);
973         wait(rreq);
974 
975         compare_buffers(send_buf, recv_buf);
976     }
977 
wait_for_server_ep()978     void wait_for_server_ep()
979     {
980         if (!test_ucp_sockaddr::wait_for_server_ep(false)) {
981             UCS_TEST_ABORT("server endpoint is not created");
982         }
983     }
984 
test_stream_send_recv(size_t size,bool is_exp)985     void test_stream_send_recv(size_t size, bool is_exp)
986     {
987         std::string send_buf(size, 'x');
988         std::string recv_buf(size, 'y');
989         size_t recv_length;
990         void *rreq, *sreq;
991 
992         if (is_exp) {
993             wait_for_server_ep();
994             rreq = ucp_stream_recv_nb(receiver().ep(), &recv_buf[0], size,
995                                       ucp_dt_make_contig(1), rstream_complete_cb,
996                                       &recv_length, UCP_STREAM_RECV_FLAG_WAITALL);
997             sreq = ucp_stream_send_nb(sender().ep(), &send_buf[0], size,
998                                       ucp_dt_make_contig(1), scomplete_cb, 0);
999         } else {
1000             sreq = ucp_stream_send_nb(sender().ep(), &send_buf[0], size,
1001                                    ucp_dt_make_contig(1), scomplete_cb, 0);
1002             short_progress_loop();
1003             wait_for_server_ep();
1004             rreq = ucp_stream_recv_nb(receiver().ep(), &recv_buf[0], size,
1005                                       ucp_dt_make_contig(1), rstream_complete_cb,
1006                                       &recv_length, UCP_STREAM_RECV_FLAG_WAITALL);
1007         }
1008 
1009         wait(sreq);
1010         wait(rreq);
1011 
1012         compare_buffers(send_buf, recv_buf);
1013     }
1014 
register_mem(entity * initiator,entity * target,void * buffer,size_t length,ucp_mem_h * memh_p,ucp_rkey_h * rkey_p)1015     void register_mem(entity* initiator, entity* target, void *buffer,
1016                       size_t length, ucp_mem_h *memh_p, ucp_rkey_h *rkey_p)
1017     {
1018         ucp_mem_map_params_t params = {0};
1019         params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
1020                             UCP_MEM_MAP_PARAM_FIELD_LENGTH;
1021         params.address    = buffer;
1022         params.length     = length;
1023 
1024         ucs_status_t status = ucp_mem_map(target->ucph(), &params, memh_p);
1025         ASSERT_UCS_OK(status);
1026 
1027         void *rkey_buffer;
1028         size_t rkey_buffer_size;
1029         status = ucp_rkey_pack(target->ucph(), *memh_p, &rkey_buffer,
1030                                &rkey_buffer_size);
1031         ASSERT_UCS_OK(status);
1032 
1033         status = ucp_ep_rkey_unpack(initiator->ep(), rkey_buffer, rkey_p);
1034         ASSERT_UCS_OK(status);
1035 
1036         ucp_rkey_buffer_release(rkey_buffer);
1037     }
1038 
test_rma(size_t size,rma_nb_func_t rma_func)1039     void test_rma(size_t size, rma_nb_func_t rma_func)
1040     {
1041         std::string send_buf(size, 'x');
1042         std::string recv_buf(size, 'y');
1043 
1044         ucp_mem_h memh;
1045         ucp_rkey_h rkey;
1046 
1047         register_mem(&sender(), &receiver(), &recv_buf[0], size, &memh, &rkey);
1048 
1049         std::vector<void*> reqs;
1050         (this->*rma_func)(send_buf, recv_buf, rkey, reqs);
1051 
1052         while (!reqs.empty()) {
1053             wait(reqs.back());
1054             reqs.pop_back();
1055         }
1056 
1057         compare_buffers(send_buf, recv_buf);
1058 
1059         ucp_rkey_destroy(rkey);
1060         ucs_status_t status = ucp_mem_unmap(receiver().ucph(), memh);
1061         ASSERT_UCS_OK(status);
1062     }
1063 
test_am_send_recv(size_t size)1064     void test_am_send_recv(size_t size)
1065     {
1066         std::string sb(size, 'x');
1067 
1068         bool am_received = false;
1069         ucp_worker_set_am_handler(receiver().worker(), 0,
1070                                   rx_am_msg_cb, &am_received, 0);
1071 
1072         ucs_status_ptr_t sreq = ucp_am_send_nb(sender().ep(), 0, &sb[0], size,
1073                                                ucp_dt_make_contig(1),
1074                                                scomplete_cb, 0);
1075         wait(sreq);
1076         wait_for_flag(&am_received);
1077         EXPECT_TRUE(am_received);
1078 
1079         ucp_worker_set_am_handler(receiver().worker(), 0, NULL, NULL, 0);
1080     }
1081 
1082 private:
rx_am_msg_cb(void * arg,void * data,size_t length,ucp_ep_h reply_ep,unsigned flags)1083     static ucs_status_t rx_am_msg_cb(void *arg, void *data, size_t length,
1084                                      ucp_ep_h reply_ep, unsigned flags) {
1085         volatile bool *am_rx = reinterpret_cast<volatile bool*>(arg);
1086         EXPECT_FALSE(*am_rx);
1087         *am_rx = true;
1088         return UCS_OK;
1089     }
1090 };
1091 
1092 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_exp,
1093            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1094 {
1095     test_tag_send_recv(4 * UCS_KBYTE, true);
1096 }
1097 
1098 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_exp,
1099            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1100 {
1101     test_tag_send_recv(64 * UCS_KBYTE, true);
1102 }
1103 
1104 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_exp_sync,
1105            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1106 {
1107     test_tag_send_recv(4 * UCS_KBYTE, true, true);
1108 }
1109 
1110 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_exp_sync,
1111            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1112 {
1113     test_tag_send_recv(64 * UCS_KBYTE, true, true);
1114 }
1115 
1116 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_rndv_exp, "RNDV_THRESH=10k")
1117 {
1118     test_tag_send_recv(64 * UCS_KBYTE, true);
1119 }
1120 
1121 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_unexp,
1122            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1123 {
1124     test_tag_send_recv(4 * UCS_KBYTE, false);
1125 }
1126 
1127 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_unexp,
1128            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1129 {
1130     test_tag_send_recv(64 * UCS_KBYTE, false);
1131 }
1132 
1133 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_4k_unexp_sync,
1134            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1135 {
1136     test_tag_send_recv(4 * UCS_KBYTE, false, true);
1137 }
1138 
1139 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_zcopy_64k_unexp_sync,
1140            "ZCOPY_THRESH=2k", "RNDV_THRESH=inf")
1141 {
1142     test_tag_send_recv(64 * UCS_KBYTE, false, true);
1143 }
1144 
1145 UCS_TEST_P(test_ucp_sockaddr_protocols, tag_rndv_unexp, "RNDV_THRESH=10k")
1146 {
1147     test_tag_send_recv(64 * UCS_KBYTE, false);
1148 }
1149 
1150 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_4k_exp, "ZCOPY_THRESH=inf")
1151 {
1152     test_stream_send_recv(4 * UCS_KBYTE, true);
1153 }
1154 
1155 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_4k_unexp,
1156            "ZCOPY_THRESH=inf")
1157 {
1158     test_stream_send_recv(4 * UCS_KBYTE, false);
1159 }
1160 
1161 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_64k_exp, "ZCOPY_THRESH=inf")
1162 {
1163     test_stream_send_recv(64 * UCS_KBYTE, true);
1164 }
1165 
1166 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_bcopy_64k_unexp,
1167            "ZCOPY_THRESH=inf")
1168 {
1169     test_stream_send_recv(64 * UCS_KBYTE, false);
1170 }
1171 
1172 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_zcopy_64k_exp, "ZCOPY_THRESH=2k")
1173 {
1174     test_stream_send_recv(64 * UCS_KBYTE, true);
1175 }
1176 
1177 UCS_TEST_P(test_ucp_sockaddr_protocols, stream_zcopy_64k_unexp,
1178            "ZCOPY_THRESH=2k")
1179 {
1180     test_stream_send_recv(64 * UCS_KBYTE, false);
1181 }
1182 
UCS_TEST_P(test_ucp_sockaddr_protocols,get_bcopy_small)1183 UCS_TEST_P(test_ucp_sockaddr_protocols, get_bcopy_small)
1184 {
1185     test_rma(8, &test_ucp_sockaddr_protocols::get_nb);
1186 }
1187 
1188 UCS_TEST_P(test_ucp_sockaddr_protocols, get_bcopy, "ZCOPY_THRESH=inf")
1189 {
1190     test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::get_nb);
1191 }
1192 
1193 UCS_TEST_P(test_ucp_sockaddr_protocols, get_zcopy, "ZCOPY_THRESH=10k")
1194 {
1195     test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::get_nb);
1196 }
1197 
UCS_TEST_P(test_ucp_sockaddr_protocols,put_bcopy_small)1198 UCS_TEST_P(test_ucp_sockaddr_protocols, put_bcopy_small)
1199 {
1200     test_rma(8, &test_ucp_sockaddr_protocols::put_nb);
1201 }
1202 
1203 UCS_TEST_P(test_ucp_sockaddr_protocols, put_bcopy, "ZCOPY_THRESH=inf")
1204 {
1205     test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::put_nb);
1206 }
1207 
1208 UCS_TEST_P(test_ucp_sockaddr_protocols, put_zcopy, "ZCOPY_THRESH=10k")
1209 {
1210     test_rma(64 * UCS_KBYTE, &test_ucp_sockaddr_protocols::put_nb);
1211 }
1212 
UCS_TEST_P(test_ucp_sockaddr_protocols,am_short)1213 UCS_TEST_P(test_ucp_sockaddr_protocols, am_short)
1214 {
1215     test_am_send_recv(1);
1216 }
1217 
1218 UCS_TEST_P(test_ucp_sockaddr_protocols, am_bcopy_1k, "ZCOPY_THRESH=inf")
1219 {
1220     test_am_send_recv(1 * UCS_KBYTE);
1221 }
1222 
1223 UCS_TEST_P(test_ucp_sockaddr_protocols, am_bcopy_64k, "ZCOPY_THRESH=inf")
1224 {
1225     test_am_send_recv(64 * UCS_KBYTE);
1226 }
1227 
1228 UCS_TEST_P(test_ucp_sockaddr_protocols, am_zcopy_1k, "ZCOPY_THRESH=512")
1229 {
1230     test_am_send_recv(1 * UCS_KBYTE);
1231 }
1232 
1233 UCS_TEST_P(test_ucp_sockaddr_protocols, am_zcopy_64k, "ZCOPY_THRESH=512")
1234 {
1235     test_am_send_recv(64 * UCS_KBYTE);
1236 }
1237 
1238 
1239 /* Only IB transports support CM for now
1240  * For DC case, allow fallback to UD if DC is not supported
1241  */
1242 #define UCP_INSTANTIATE_CM_TEST_CASE(_test_case) \
1243     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, dcudx, "dc_x,ud") \
1244     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, ud,    "ud_v") \
1245     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, udx,   "ud_x") \
1246     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, rc,    "rc_v") \
1247     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, rcx,   "rc_x") \
1248     UCP_INSTANTIATE_TEST_CASE_TLS(_test_case, ib,    "ib")
1249 
1250 UCP_INSTANTIATE_CM_TEST_CASE(test_ucp_sockaddr_protocols)
1251