1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 * See file LICENSE for terms.
4 */
5 
6 #include "ucp_test.h"
7 #include <common/test_helpers.h>
8 #include <ifaddrs.h>
9 
10 extern "C" {
11 #include <ucp/core/ucp_worker.h>
12 #if HAVE_IB
13 #include <uct/ib/ud/base/ud_iface.h>
14 #endif
15 #include <ucs/arch/atomic.h>
16 #include <ucs/stats/stats.h>
17 }
18 
19 #include <queue>
20 
21 namespace ucp {
22 const uint32_t MAGIC = 0xd7d7d7d7U;
23 }
24 
operator <<(std::ostream & os,const ucp_test_param & test_param)25 std::ostream& operator<<(std::ostream& os, const ucp_test_param& test_param)
26 {
27     std::vector<std::string>::const_iterator iter;
28     const std::vector<std::string>& transports = test_param.transports;
29     for (iter = transports.begin(); iter != transports.end(); ++iter) {
30         if (iter != transports.begin()) {
31             os << ",";
32         }
33         os << *iter;
34     }
35     return os;
36 }
37 
38 const ucp_datatype_t ucp_test::DATATYPE     = ucp_dt_make_contig(1);
39 const ucp_datatype_t ucp_test::DATATYPE_IOV = ucp_dt_make_iov();
40 
ucp_test()41 ucp_test::ucp_test() {
42     ucs_status_t status;
43     status = ucp_config_read(NULL, NULL, &m_ucp_config);
44     ASSERT_UCS_OK(status);
45 }
46 
~ucp_test()47 ucp_test::~ucp_test() {
48 
49     for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
50          iter != entities().end(); ++iter)
51     {
52         (*iter)->warn_existing_eps();
53     }
54     ucp_config_release(m_ucp_config);
55 }
56 
cleanup()57 void ucp_test::cleanup() {
58     /* disconnect before destroying the entities */
59     for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
60          iter != entities().end(); ++iter)
61     {
62         disconnect(**iter);
63     }
64 
65     for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
66          iter != entities().end(); ++iter)
67     {
68         (*iter)->cleanup();
69     }
70 
71     m_entities.clear();
72 }
73 
init()74 void ucp_test::init() {
75     test_base::init();
76 
77     create_entity();
78     if (!is_self()) {
79         create_entity();
80     }
81 }
82 
check_transport(const std::string check_tl_name,const std::vector<std::string> & tl_names)83 static bool check_transport(const std::string check_tl_name,
84                             const std::vector<std::string>& tl_names) {
85     return (std::find(tl_names.begin(), tl_names.end(),
86                       check_tl_name) != tl_names.end());
87 }
88 
has_transport(const std::string & tl_name) const89 bool ucp_test::has_transport(const std::string& tl_name) const {
90     return check_transport(tl_name, GetParam().transports);
91 }
92 
has_any_transport(const std::vector<std::string> & tl_names) const93 bool ucp_test::has_any_transport(const std::vector<std::string>& tl_names) const {
94     const std::vector<std::string>& all_tl_names = GetParam().transports;
95 
96     return std::find_first_of(all_tl_names.begin(), all_tl_names.end(),
97                               tl_names.begin(),     tl_names.end()) !=
98            all_tl_names.end();
99 }
100 
is_self() const101 bool ucp_test::is_self() const {
102     return "self" == GetParam().transports.front();
103 }
104 
create_entity(bool add_in_front)105 ucp_test_base::entity* ucp_test::create_entity(bool add_in_front) {
106     return create_entity(add_in_front, GetParam());
107 }
108 
109 ucp_test_base::entity*
create_entity(bool add_in_front,const ucp_test_param & test_param)110 ucp_test::create_entity(bool add_in_front, const ucp_test_param &test_param) {
111     entity *e = new entity(test_param, m_ucp_config, get_worker_params(), this);
112     if (add_in_front) {
113         m_entities.push_front(e);
114     } else {
115         m_entities.push_back(e);
116     }
117     return e;
118 }
119 
get_ctx_params()120 ucp_params_t ucp_test::get_ctx_params() {
121     ucp_params_t params;
122     memset(&params, 0, sizeof(params));
123     params.field_mask |= UCP_PARAM_FIELD_FEATURES;
124     return params;
125 }
126 
get_worker_params()127 ucp_worker_params_t ucp_test::get_worker_params() {
128     ucp_worker_params_t params;
129     memset(&params, 0, sizeof(params));
130     params.field_mask  = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
131     params.thread_mode = UCS_THREAD_MODE_MULTI;
132     return params;
133 }
134 
get_ep_params()135 ucp_ep_params_t ucp_test::get_ep_params() {
136     ucp_ep_params_t params;
137     memset(&params, 0, sizeof(params));
138     return params;
139 }
140 
progress(int worker_index) const141 unsigned ucp_test::progress(int worker_index) const {
142     unsigned count = 0;
143     for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
144          iter != entities().end(); ++iter)
145     {
146         count += (*iter)->progress(worker_index);
147         sched_yield();
148     }
149     return count;
150 }
151 
short_progress_loop(int worker_index) const152 void ucp_test::short_progress_loop(int worker_index) const {
153     for (unsigned i = 0; i < 100; ++i) {
154         progress(worker_index);
155         usleep(100);
156     }
157 }
158 
flush_ep(const entity & e,int worker_index,int ep_index)159 void ucp_test::flush_ep(const entity &e, int worker_index, int ep_index)
160 {
161     void *request = e.flush_ep_nb(worker_index, ep_index);
162     wait(request, worker_index);
163 }
164 
flush_worker(const entity & e,int worker_index)165 void ucp_test::flush_worker(const entity &e, int worker_index)
166 {
167     void *request = e.flush_worker_nb(worker_index);
168     wait(request, worker_index);
169 }
170 
disconnect(entity & e)171 void ucp_test::disconnect(entity& e) {
172     bool has_failed_entity = false;
173     for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
174          !has_failed_entity && (iter != entities().end()); ++iter) {
175         has_failed_entity = ((*iter)->get_err_num() > 0);
176     }
177 
178     for (int i = 0; i < e.get_num_workers(); i++) {
179         enum ucp_ep_close_mode close_mode;
180 
181         if (has_failed_entity) {
182             close_mode = UCP_EP_CLOSE_MODE_FORCE;
183         } else {
184             flush_worker(e, i);
185             close_mode = UCP_EP_CLOSE_MODE_FLUSH;
186         }
187 
188         e.close_all_eps(*this, i, close_mode);
189     }
190 }
191 
wait(void * req,int worker_index)192 void ucp_test::wait(void *req, int worker_index)
193 {
194     if (req == NULL) {
195         return;
196     }
197 
198     if (UCS_PTR_IS_ERR(req)) {
199         ucs_error("operation returned error: %s",
200                   ucs_status_string(UCS_PTR_STATUS(req)));
201         return;
202     }
203 
204     ucs_status_t status;
205     ucs_time_t deadline = ucs::get_deadline();
206     do {
207         progress(worker_index);
208         status = ucp_request_check_status(req);
209     } while ((status == UCS_INPROGRESS) && (ucs_get_time() < deadline));
210 
211     if (status != UCS_OK) {
212         /* UCS errors are suppressed in case of error handling tests */
213         ucs_error("request %p completed with error %s", req,
214                   ucs_status_string(status));
215     }
216 
217     ucp_request_release(req);
218 }
219 
set_ucp_config(ucp_config_t * config)220 void ucp_test::set_ucp_config(ucp_config_t *config) {
221     set_ucp_config(config, GetParam());
222 }
223 
max_connections()224 int ucp_test::max_connections() {
225     if (has_transport("tcp")) {
226         return ucs::max_tcp_connections();
227     } else {
228         return std::numeric_limits<int>::max();
229     }
230 }
231 
232 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)233 ucp_test::enum_test_params(const ucp_params_t& ctx_params,
234                            const std::string& name,
235                            const std::string& test_case_name,
236                            const std::string& tls)
237 {
238     ucp_test_param test_param;
239     std::stringstream ss(tls);
240 
241     test_param.ctx_params    = ctx_params;
242     test_param.variant       = DEFAULT_PARAM_VARIANT;
243     test_param.thread_type   = SINGLE_THREAD;
244 
245     while (ss.good()) {
246         std::string tl_name;
247         std::getline(ss, tl_name, ',');
248         test_param.transports.push_back(tl_name);
249     }
250 
251     if (check_test_param(name, test_case_name, test_param)) {
252         return std::vector<ucp_test_param>(1, test_param);
253     } else {
254         return std::vector<ucp_test_param>();
255     }
256 }
257 
generate_test_params_variant(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls,int variant,std::vector<ucp_test_param> & test_params,int thread_type)258 void ucp_test::generate_test_params_variant(const ucp_params_t& ctx_params,
259                                             const std::string& name,
260                                             const std::string& test_case_name,
261                                             const std::string& tls,
262                                             int variant,
263                                             std::vector<ucp_test_param>& test_params,
264                                             int thread_type)
265 {
266     std::vector<ucp_test_param> tmp_test_params;
267 
268     tmp_test_params = ucp_test::enum_test_params(ctx_params,name,
269                                                  test_case_name, tls);
270     for (std::vector<ucp_test_param>::iterator iter = tmp_test_params.begin();
271          iter != tmp_test_params.end(); ++iter)
272     {
273         iter->variant = variant;
274         iter->thread_type = thread_type;
275         test_params.push_back(*iter);
276     }
277 }
278 
set_ucp_config(ucp_config_t * config,const ucp_test_param & test_param)279 void ucp_test::set_ucp_config(ucp_config_t *config,
280                               const ucp_test_param& test_param)
281 {
282     std::stringstream ss;
283     ss << test_param;
284     ucp_config_modify(config, "TLS", ss.str().c_str());
285     /* prevent configuration warnings in the UCP testing */
286     ucp_config_modify(config, "WARN_INVALID_CONFIG", "no");
287 }
288 
modify_config(const std::string & name,const std::string & value,bool optional)289 void ucp_test::modify_config(const std::string& name, const std::string& value,
290                              bool optional)
291 {
292     ucs_status_t status;
293 
294     status = ucp_config_modify(m_ucp_config, name.c_str(), value.c_str());
295     if (status == UCS_ERR_NO_ELEM) {
296         test_base::modify_config(name, value, optional);
297     } else if (status != UCS_OK) {
298         UCS_TEST_ABORT("Couldn't modify ucp config parameter: " <<
299                         name.c_str() << " to " << value.c_str() << ": " <<
300                         ucs_status_string(status));
301     }
302 }
303 
stats_activate()304 void ucp_test::stats_activate()
305 {
306     ucs_stats_cleanup();
307     push_config();
308     modify_config("STATS_DEST",    "file:/dev/null");
309     modify_config("STATS_TRIGGER", "exit");
310     ucs_stats_init();
311     ASSERT_TRUE(ucs_stats_is_active());
312 }
313 
stats_restore()314 void ucp_test::stats_restore()
315 {
316     ucs_stats_cleanup();
317     pop_config();
318     ucs_stats_init();
319 }
320 
321 
check_test_param(const std::string & name,const std::string & test_case_name,const ucp_test_param & test_param)322 bool ucp_test::check_test_param(const std::string& name,
323                                 const std::string& test_case_name,
324                                 const ucp_test_param& test_param)
325 {
326     typedef std::map<std::string, bool> cache_t;
327     static cache_t cache;
328 
329     if (test_param.transports.empty()) {
330         return false;
331     }
332 
333     cache_t::iterator iter = cache.find(name);
334     if (iter != cache.end()) {
335         return iter->second;
336     }
337 
338     ucs::handle<ucp_config_t*> config;
339     UCS_TEST_CREATE_HANDLE(ucp_config_t*, config, ucp_config_release,
340                            ucp_config_read, NULL, NULL);
341     set_ucp_config(config, test_param);
342 
343     ucp_context_h ucph;
344     ucs_status_t status;
345     {
346         scoped_log_handler slh(hide_errors_logger);
347         status = ucp_init(&test_param.ctx_params, config, &ucph);
348     }
349 
350     bool result;
351     if (status == UCS_OK) {
352         ucp_cleanup(ucph);
353         result = true;
354     } else if (status == UCS_ERR_NO_DEVICE) {
355         result = false;
356     } else {
357         UCS_TEST_ABORT("Failed to create context (" << test_case_name << "): "
358                        << ucs_status_string(status));
359     }
360 
361     UCS_TEST_MESSAGE << "checking " << name << ": " << (result ? "yes" : "no");
362     cache[name] = result;
363     return result;
364 }
365 
entity(const ucp_test_param & test_param,ucp_config_t * ucp_config,const ucp_worker_params_t & worker_params,const ucp_test_base * test_owner)366 ucp_test_base::entity::entity(const ucp_test_param& test_param,
367                               ucp_config_t* ucp_config,
368                               const ucp_worker_params_t& worker_params,
369                               const ucp_test_base *test_owner)
370     : m_err_cntr(0), m_rejected_cntr(0)
371 {
372     ucp_test_param entity_param = test_param;
373     ucp_worker_params_t local_worker_params = worker_params;
374     int num_workers;
375 
376     if (test_param.thread_type == MULTI_THREAD_CONTEXT) {
377         num_workers = MT_TEST_NUM_THREADS;
378         entity_param.ctx_params.mt_workers_shared = 1;
379         local_worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
380     } else if (test_param.thread_type == MULTI_THREAD_WORKER) {
381         num_workers = 1;
382         entity_param.ctx_params.mt_workers_shared = 0;
383         local_worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
384     } else {
385         num_workers = 1;
386         entity_param.ctx_params.mt_workers_shared = 0;
387         local_worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
388     }
389 
390     entity_param.ctx_params.field_mask |= UCP_PARAM_FIELD_MT_WORKERS_SHARED;
391     local_worker_params.field_mask     |= UCP_WORKER_PARAM_FIELD_THREAD_MODE;
392 
393     ucp_test::set_ucp_config(ucp_config, entity_param);
394 
395     {
396         scoped_log_handler slh(hide_errors_logger);
397         UCS_TEST_CREATE_HANDLE_IF_SUPPORTED(ucp_context_h, m_ucph, ucp_cleanup,
398                                             ucp_init, &entity_param.ctx_params,
399                                             ucp_config);
400     }
401 
402     m_workers.resize(num_workers);
403     for (int i = 0; i < num_workers; i++) {
404         UCS_TEST_CREATE_HANDLE(ucp_worker_h, m_workers[i].first,
405                                ucp_worker_destroy, ucp_worker_create, m_ucph,
406                                &local_worker_params);
407     }
408 }
409 
~entity()410 ucp_test_base::entity::~entity() {
411     cleanup();
412 }
413 
connect(const entity * other,const ucp_ep_params_t & ep_params,int ep_idx,int do_set_ep)414 void ucp_test_base::entity::connect(const entity* other,
415                                     const ucp_ep_params_t& ep_params,
416                                     int ep_idx, int do_set_ep) {
417     assert(get_num_workers() == other->get_num_workers());
418     for (unsigned i = 0; i < unsigned(get_num_workers()); i++) {
419         ucs_status_t status;
420         ucp_address_t *address;
421         size_t address_length;
422         ucp_ep_h ep;
423 
424         status = ucp_worker_get_address(other->worker(i), &address, &address_length);
425         ASSERT_UCS_OK(status);
426 
427         {
428             scoped_log_handler slh(hide_errors_logger);
429 
430             ucp_ep_params_t local_ep_params = ep_params;
431             local_ep_params.field_mask |= UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
432             local_ep_params.address     = address;
433 
434             status = ucp_ep_create(m_workers[i].first, &local_ep_params, &ep);
435         }
436 
437         if (status == UCS_ERR_UNREACHABLE) {
438             ucp_worker_release_address(other->worker(i), address);
439             UCS_TEST_SKIP_R(m_errors.empty() ? "Unreachable" : m_errors.back());
440         }
441 
442         ASSERT_UCS_OK(status, << " (" << m_errors.back() << ")");
443 
444         if (do_set_ep) {
445             set_ep(ep, i, ep_idx);
446         }
447 
448         ucp_worker_release_address(other->worker(i), address);
449     }
450 }
451 
452 /*
453  * Checks if the client's address matches any IP address on the server's side.
454  */
verify_client_address(struct sockaddr_storage * client_address)455 bool ucp_test_base::entity::verify_client_address(struct sockaddr_storage
456                                                   *client_address)
457 {
458     struct ifaddrs* ifaddrs;
459 
460     if (getifaddrs(&ifaddrs) != 0) {
461         return false;
462     }
463 
464     for (struct ifaddrs *ifa = ifaddrs; ifa != NULL; ifa = ifa->ifa_next) {
465         if (ucs_netif_flags_is_active(ifa->ifa_flags) &&
466             ucs::is_inet_addr(ifa->ifa_addr))
467         {
468             if (!ucs_sockaddr_ip_cmp((const struct sockaddr*)client_address,
469                                      ifa->ifa_addr)) {
470                 freeifaddrs(ifaddrs);
471                 return true;
472             }
473         }
474     }
475 
476     freeifaddrs(ifaddrs);
477     return false;
478 }
479 
accept(ucp_worker_h worker,ucp_conn_request_h conn_request)480 ucp_ep_h ucp_test_base::entity::accept(ucp_worker_h worker,
481                                        ucp_conn_request_h conn_request)
482 {
483     ucp_ep_params_t ep_params = *m_server_ep_params;
484     ucp_conn_request_attr_t attr;
485     ucs_status_t status;
486     ucp_ep_h ep;
487 
488     attr.field_mask = UCP_CONN_REQUEST_ATTR_FIELD_CLIENT_ADDR;
489     status = ucp_conn_request_query(conn_request, &attr);
490     EXPECT_TRUE((status == UCS_OK) || (status == UCS_ERR_UNSUPPORTED));
491     if (status == UCS_OK) {
492         EXPECT_TRUE(verify_client_address(&attr.client_address));
493     }
494 
495     ep_params.field_mask  |= UCP_EP_PARAM_FIELD_CONN_REQUEST |
496                              UCP_EP_PARAM_FIELD_USER_DATA;
497     ep_params.user_data    = reinterpret_cast<void*>(this);
498     ep_params.conn_request = conn_request;
499 
500     status = ucp_ep_create(worker, &ep_params, &ep);
501     if (status == UCS_ERR_UNREACHABLE) {
502         UCS_TEST_SKIP_R("Skipping due an unreachable destination (unsupported "
503                         "feature or no supported transport to send partial "
504                         "worker address)");
505     }
506     ASSERT_UCS_OK(status);
507     return ep;
508 }
509 
510 
modify_ep(const ucp_ep_params_t & ep_params,int worker_idx,int ep_idx)511 void* ucp_test_base::entity::modify_ep(const ucp_ep_params_t& ep_params,
512                                       int worker_idx, int ep_idx) {
513     return ucp_ep_modify_nb(ep(worker_idx, ep_idx), &ep_params);
514 }
515 
516 
set_ep(ucp_ep_h ep,int worker_index,int ep_index)517 void ucp_test_base::entity::set_ep(ucp_ep_h ep, int worker_index, int ep_index)
518 {
519     if (ep_index < get_num_eps(worker_index)) {
520         m_workers[worker_index].second[ep_index].reset(ep, ep_destructor, this);
521     } else {
522         m_workers[worker_index].second.push_back(
523                         ucs::handle<ucp_ep_h, entity *>(ep, ucp_ep_destroy));
524     }
525 }
526 
empty_send_completion(void * r,ucs_status_t status)527 void ucp_test_base::entity::empty_send_completion(void *r, ucs_status_t status) {
528 }
529 
accept_ep_cb(ucp_ep_h ep,void * arg)530 void ucp_test_base::entity::accept_ep_cb(ucp_ep_h ep, void *arg) {
531     entity *self = reinterpret_cast<entity*>(arg);
532     int worker_index = 0; /* TODO pass worker index in arg */
533 
534     /* take error handler from test fixture and add user data */
535     ucp_ep_params_t ep_params = *self->m_server_ep_params;
536     ep_params.field_mask &= UCP_EP_PARAM_FIELD_ERR_HANDLER;
537     ep_params.field_mask |= UCP_EP_PARAM_FIELD_USER_DATA;
538     ep_params.user_data   = reinterpret_cast<void*>(self);
539 
540     void *req = ucp_ep_modify_nb(ep, &ep_params);
541     ASSERT_UCS_PTR_OK(req); /* don't expect this operation to block */
542 
543     self->set_ep(ep, worker_index, self->get_num_eps(worker_index));
544 }
545 
accept_conn_cb(ucp_conn_request_h conn_req,void * arg)546 void ucp_test_base::entity::accept_conn_cb(ucp_conn_request_h conn_req, void* arg)
547 {
548     entity *self = reinterpret_cast<entity*>(arg);
549     self->m_conn_reqs.push(conn_req);
550 }
551 
reject_conn_cb(ucp_conn_request_h conn_req,void * arg)552 void ucp_test_base::entity::reject_conn_cb(ucp_conn_request_h conn_req, void* arg)
553 {
554     entity *self = reinterpret_cast<entity*>(arg);
555     ucp_listener_reject(self->m_listener, conn_req);
556     self->m_rejected_cntr++;
557 }
558 
flush_ep_nb(int worker_index,int ep_index) const559 void* ucp_test_base::entity::flush_ep_nb(int worker_index, int ep_index) const {
560     return ucp_ep_flush_nb(ep(worker_index, ep_index), 0, empty_send_completion);
561 }
562 
flush_worker_nb(int worker_index) const563 void* ucp_test_base::entity::flush_worker_nb(int worker_index) const {
564     if (worker(worker_index) == NULL) {
565         return NULL;
566     }
567     return ucp_worker_flush_nb(worker(worker_index), 0, empty_send_completion);
568 }
569 
fence(int worker_index) const570 void ucp_test_base::entity::fence(int worker_index) const {
571     ucs_status_t status = ucp_worker_fence(worker(worker_index));
572     ASSERT_UCS_OK(status);
573 }
574 
disconnect_nb(int worker_index,int ep_index,enum ucp_ep_close_mode mode)575 void *ucp_test_base::entity::disconnect_nb(int worker_index, int ep_index,
576                                            enum ucp_ep_close_mode mode) {
577     ucp_ep_h ep = revoke_ep(worker_index, ep_index);
578     if (ep == NULL) {
579         return NULL;
580     }
581 
582     void *req = ucp_ep_close_nb(ep, mode);
583     if (UCS_PTR_IS_PTR(req)) {
584         m_close_ep_reqs.push_back(req);
585         return req;
586     }
587 
588     ASSERT_UCS_OK(UCS_PTR_STATUS(req));
589     return NULL;
590 }
591 
close_ep_req_free(void * close_req)592 void ucp_test_base::entity::close_ep_req_free(void *close_req) {
593     if (close_req == NULL) {
594         return;
595     }
596 
597     ucs_status_t status = UCS_PTR_IS_ERR(close_req) ? UCS_PTR_STATUS(close_req) :
598                           ucp_request_check_status(close_req);
599     ASSERT_NE(UCS_INPROGRESS, status) << "free not completed EP close request";
600     if (status != UCS_OK) {
601         UCS_TEST_MESSAGE << "ucp_ep_close_nb completed with status "
602                          << ucs_status_string(status);
603     }
604 
605     m_close_ep_reqs.erase(std::find(m_close_ep_reqs.begin(),
606                                     m_close_ep_reqs.end(), close_req));
607     ucp_request_free(close_req);
608 }
609 
close_all_eps(const ucp_test & test,int worker_idx,enum ucp_ep_close_mode mode)610 void ucp_test_base::entity::close_all_eps(const ucp_test &test, int worker_idx,
611                                           enum ucp_ep_close_mode mode) {
612     for (int j = 0; j < get_num_eps(worker_idx); j++) {
613         disconnect_nb(worker_idx, j, mode);
614     }
615 
616     ucs_time_t deadline = ucs::get_deadline();
617     while (!m_close_ep_reqs.empty() && (ucs_get_time() < deadline)) {
618         void *req = m_close_ep_reqs.front();
619         while (!is_request_completed(req)) {
620             test.progress(worker_idx);
621         }
622 
623         close_ep_req_free(req);
624     }
625 
626     EXPECT_TRUE(m_close_ep_reqs.empty()) << m_close_ep_reqs.size()
627                                          << " endpoints were not closed";
628 }
629 
destroy_worker(int worker_index)630 void ucp_test_base::entity::destroy_worker(int worker_index) {
631     for (size_t i = 0; i < m_workers[worker_index].second.size(); ++i) {
632         m_workers[worker_index].second[i].revoke();
633     }
634     m_workers[worker_index].first.reset();
635 }
636 
ep(int worker_index,int ep_index) const637 ucp_ep_h ucp_test_base::entity::ep(int worker_index, int ep_index) const {
638     if (size_t(worker_index) < m_workers.size()) {
639         if (size_t(ep_index) < m_workers[worker_index].second.size()) {
640             return m_workers[worker_index].second[ep_index];
641         }
642     }
643     return NULL;
644 }
645 
revoke_ep(int worker_index,int ep_index) const646 ucp_ep_h ucp_test_base::entity::revoke_ep(int worker_index, int ep_index) const {
647     ucp_ep_h ucp_ep = ep(worker_index, ep_index);
648 
649     if (ucp_ep) {
650         m_workers[worker_index].second[ep_index].revoke();
651     }
652 
653     return ucp_ep;
654 }
655 
listen(listen_cb_type_t cb_type,const struct sockaddr * saddr,socklen_t addrlen,const ucp_ep_params_t & ep_params,int worker_index)656 ucs_status_t ucp_test_base::entity::listen(listen_cb_type_t cb_type,
657                                            const struct sockaddr* saddr,
658                                            socklen_t addrlen,
659                                            const ucp_ep_params_t& ep_params,
660                                            int worker_index)
661 {
662     ucp_listener_params_t params;
663     ucp_listener_h        listener;
664 
665     params.field_mask             = UCP_LISTENER_PARAM_FIELD_SOCK_ADDR;
666     params.sockaddr.addr          = saddr;
667     params.sockaddr.addrlen       = addrlen;
668 
669     switch (cb_type) {
670     case LISTEN_CB_EP:
671         params.field_mask        |= UCP_LISTENER_PARAM_FIELD_ACCEPT_HANDLER;
672         params.accept_handler.cb  = accept_ep_cb;
673         params.accept_handler.arg = reinterpret_cast<void*>(this);
674         break;
675     case LISTEN_CB_CONN:
676         params.field_mask        |= UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
677         params.conn_handler.cb    = accept_conn_cb;
678         params.conn_handler.arg   = reinterpret_cast<void*>(this);
679         break;
680     case LISTEN_CB_REJECT:
681         params.field_mask        |= UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
682         params.conn_handler.cb    = reject_conn_cb;
683         params.conn_handler.arg   = reinterpret_cast<void*>(this);
684         break;
685     default:
686         UCS_TEST_ABORT("invalid test parameter");
687     }
688 
689     m_server_ep_params.reset(new ucp_ep_params_t(ep_params),
690                              ucs::deleter<ucp_ep_params_t>);
691 
692     ucs_status_t status;
693     {
694         scoped_log_handler wrap_err(wrap_errors_logger);
695         status = ucp_listener_create(worker(worker_index), &params, &listener);
696     }
697 
698     if (status == UCS_OK) {
699         m_listener.reset(listener, ucp_listener_destroy);
700     } else {
701         /* throw error if status is not (UCS_OK or UCS_ERR_UNREACHABLE or
702          * UCS_ERR_BUSY).
703          * UCS_ERR_INVALID_PARAM may also return but then the test should fail */
704         EXPECT_TRUE((status == UCS_ERR_UNREACHABLE) ||
705                     (status == UCS_ERR_BUSY)) << ucs_status_string(status);
706     }
707 
708     return status;
709 }
710 
worker(int worker_index) const711 ucp_worker_h ucp_test_base::entity::worker(int worker_index) const {
712     if (worker_index < get_num_workers()) {
713         return m_workers[worker_index].first;
714     } else {
715         return NULL;
716     }
717 }
718 
ucph() const719 ucp_context_h ucp_test_base::entity::ucph() const {
720     return m_ucph;
721 }
722 
listenerh() const723 ucp_listener_h ucp_test_base::entity::listenerh() const {
724     return m_listener;
725 }
726 
progress(int worker_index)727 unsigned ucp_test_base::entity::progress(int worker_index)
728 {
729     ucp_worker_h ucp_worker = worker(worker_index);
730 
731     if (ucp_worker == NULL) {
732         return 0;
733     }
734 
735     unsigned progress_count = 0;
736     if (!m_conn_reqs.empty()) {
737         ucp_conn_request_h conn_req = m_conn_reqs.back();
738         m_conn_reqs.pop();
739         ucp_ep_h ep = accept(ucp_worker, conn_req);
740         set_ep(ep, worker_index, std::numeric_limits<int>::max());
741         ++progress_count;
742     }
743 
744     return progress_count + ucp_worker_progress(ucp_worker);
745 }
746 
get_num_workers() const747 int ucp_test_base::entity::get_num_workers() const {
748     return m_workers.size();
749 }
750 
get_num_eps(int worker_index) const751 int ucp_test_base::entity::get_num_eps(int worker_index) const {
752     return m_workers[worker_index].second.size();
753 }
754 
add_err(ucs_status_t status)755 void ucp_test_base::entity::add_err(ucs_status_t status) {
756     switch (status) {
757     case UCS_ERR_REJECTED:
758         ++m_rejected_cntr;
759         /* fall through */
760     default:
761         ++m_err_cntr;
762     }
763 
764     EXPECT_EQ(1ul, m_err_cntr) << "error callback is called more than once";
765 }
766 
get_err_num_rejected() const767 const size_t &ucp_test_base::entity::get_err_num_rejected() const {
768     return m_rejected_cntr;
769 }
770 
get_err_num() const771 const size_t &ucp_test_base::entity::get_err_num() const {
772     return m_err_cntr;
773 }
774 
warn_existing_eps() const775 void ucp_test_base::entity::warn_existing_eps() const {
776     for (size_t worker_index = 0; worker_index < m_workers.size(); ++worker_index) {
777         for (size_t ep_index = 0; ep_index < m_workers[worker_index].second.size();
778              ++ep_index) {
779             ADD_FAILURE() << "ep(" << worker_index << "," << ep_index <<
780                              ")=" << m_workers[worker_index].second[ep_index].get() <<
781                              " was not destroyed during test cleanup()";
782         }
783     }
784 }
785 
set_ib_ud_timeout(double timeout_sec)786 double ucp_test_base::entity::set_ib_ud_timeout(double timeout_sec)
787 {
788     double prev_timeout_sec = 0.;
789 #if HAVE_IB
790     for (ucp_rsc_index_t rsc_index = 0;
791          rsc_index < ucph()->num_tls; ++rsc_index) {
792         ucp_worker_iface_t *wiface = ucp_worker_iface(worker(), rsc_index);
793         // check if the iface is ud transport
794         if (wiface->iface->ops.iface_flush == uct_ud_iface_flush) {
795             uct_ud_iface_t *iface =
796                 ucs_derived_of(wiface->iface, uct_ud_iface_t);
797 
798             uct_ud_enter(iface);
799             if (!prev_timeout_sec) {
800                 prev_timeout_sec = ucs_time_to_sec(iface->config.peer_timeout);
801             }
802 
803             iface->config.peer_timeout = ucs_time_from_sec(timeout_sec);
804             uct_ud_leave(iface);
805         }
806     }
807 #endif
808     return prev_timeout_sec;
809 }
810 
cleanup()811 void ucp_test_base::entity::cleanup() {
812     m_listener.reset();
813     m_workers.clear();
814 }
815 
ep_destructor(ucp_ep_h ep,entity * e)816 void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e)
817 {
818     ucs_status_ptr_t req = ucp_disconnect_nb(ep);
819     if (!UCS_PTR_IS_PTR(req)) {
820         return;
821     }
822 
823     ucs_status_t        status;
824     ucp_tag_recv_info_t info;
825     do {
826         e->progress();
827         status = ucp_request_test(req, &info);
828     } while (status == UCS_INPROGRESS);
829     EXPECT_EQ(UCS_OK, status);
830     ucp_request_release(req);
831 }
832 
is_request_completed(void * request)833 bool ucp_test_base::is_request_completed(void *request) {
834     return (request == NULL) ||
835            (ucp_request_check_status(request) != UCS_INPROGRESS);
836 }
837 
mapped_buffer(size_t size,const entity & entity,int flags,ucs_memory_type_t mem_type)838 ucp_test::mapped_buffer::mapped_buffer(size_t size, const entity& entity,
839                                        int flags, ucs_memory_type_t mem_type) :
840     mem_buffer(size, mem_type), m_entity(entity), m_memh(NULL),
841     m_rkey_buffer(NULL)
842 {
843     ucs_status_t status;
844 
845     if (flags & (UCP_MEM_MAP_ALLOCATE|UCP_MEM_MAP_FIXED)) {
846         UCS_TEST_ABORT("mapped_buffer does not support allocation by UCP");
847     }
848 
849     ucp_mem_map_params_t params;
850     params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
851                         UCP_MEM_MAP_PARAM_FIELD_LENGTH |
852                         UCP_MEM_MAP_PARAM_FIELD_FLAGS;
853     params.flags      = flags;
854     params.address    = ptr();
855     params.length     = size;
856 
857     status = ucp_mem_map(m_entity.ucph(), &params, &m_memh);
858     ASSERT_UCS_OK(status);
859 
860     size_t rkey_buffer_size;
861     status = ucp_rkey_pack(m_entity.ucph(), m_memh, &m_rkey_buffer,
862                            &rkey_buffer_size);
863     ASSERT_UCS_OK(status);
864 }
865 
~mapped_buffer()866 ucp_test::mapped_buffer::~mapped_buffer()
867 {
868     ucp_rkey_buffer_release(m_rkey_buffer);
869     ucs_status_t status = ucp_mem_unmap(m_entity.ucph(), m_memh);
870     EXPECT_UCS_OK(status);
871 }
872 
rkey(const entity & entity) const873 ucs::handle<ucp_rkey_h> ucp_test::mapped_buffer::rkey(const entity& entity) const
874 {
875     ucp_rkey_h rkey;
876 
877     ucs_status_t status = ucp_ep_rkey_unpack(entity.ep(), m_rkey_buffer, &rkey);
878     ASSERT_UCS_OK(status);
879     return ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy);
880 }
881 
memh() const882 ucp_mem_h ucp_test::mapped_buffer::memh() const
883 {
884     return m_memh;
885 }
886