1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 * See file LICENSE for terms.
4 */
5
6 #include "ucp_test.h"
7 #include <common/test_helpers.h>
8 #include <ifaddrs.h>
9
10 extern "C" {
11 #include <ucp/core/ucp_worker.h>
12 #if HAVE_IB
13 #include <uct/ib/ud/base/ud_iface.h>
14 #endif
15 #include <ucs/arch/atomic.h>
16 #include <ucs/stats/stats.h>
17 }
18
19 #include <queue>
20
21 namespace ucp {
22 const uint32_t MAGIC = 0xd7d7d7d7U;
23 }
24
operator <<(std::ostream & os,const ucp_test_param & test_param)25 std::ostream& operator<<(std::ostream& os, const ucp_test_param& test_param)
26 {
27 std::vector<std::string>::const_iterator iter;
28 const std::vector<std::string>& transports = test_param.transports;
29 for (iter = transports.begin(); iter != transports.end(); ++iter) {
30 if (iter != transports.begin()) {
31 os << ",";
32 }
33 os << *iter;
34 }
35 return os;
36 }
37
38 const ucp_datatype_t ucp_test::DATATYPE = ucp_dt_make_contig(1);
39 const ucp_datatype_t ucp_test::DATATYPE_IOV = ucp_dt_make_iov();
40
ucp_test()41 ucp_test::ucp_test() {
42 ucs_status_t status;
43 status = ucp_config_read(NULL, NULL, &m_ucp_config);
44 ASSERT_UCS_OK(status);
45 }
46
~ucp_test()47 ucp_test::~ucp_test() {
48
49 for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
50 iter != entities().end(); ++iter)
51 {
52 (*iter)->warn_existing_eps();
53 }
54 ucp_config_release(m_ucp_config);
55 }
56
cleanup()57 void ucp_test::cleanup() {
58 /* disconnect before destroying the entities */
59 for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
60 iter != entities().end(); ++iter)
61 {
62 disconnect(**iter);
63 }
64
65 for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
66 iter != entities().end(); ++iter)
67 {
68 (*iter)->cleanup();
69 }
70
71 m_entities.clear();
72 }
73
init()74 void ucp_test::init() {
75 test_base::init();
76
77 create_entity();
78 if (!is_self()) {
79 create_entity();
80 }
81 }
82
check_transport(const std::string check_tl_name,const std::vector<std::string> & tl_names)83 static bool check_transport(const std::string check_tl_name,
84 const std::vector<std::string>& tl_names) {
85 return (std::find(tl_names.begin(), tl_names.end(),
86 check_tl_name) != tl_names.end());
87 }
88
has_transport(const std::string & tl_name) const89 bool ucp_test::has_transport(const std::string& tl_name) const {
90 return check_transport(tl_name, GetParam().transports);
91 }
92
has_any_transport(const std::vector<std::string> & tl_names) const93 bool ucp_test::has_any_transport(const std::vector<std::string>& tl_names) const {
94 const std::vector<std::string>& all_tl_names = GetParam().transports;
95
96 return std::find_first_of(all_tl_names.begin(), all_tl_names.end(),
97 tl_names.begin(), tl_names.end()) !=
98 all_tl_names.end();
99 }
100
is_self() const101 bool ucp_test::is_self() const {
102 return "self" == GetParam().transports.front();
103 }
104
create_entity(bool add_in_front)105 ucp_test_base::entity* ucp_test::create_entity(bool add_in_front) {
106 return create_entity(add_in_front, GetParam());
107 }
108
109 ucp_test_base::entity*
create_entity(bool add_in_front,const ucp_test_param & test_param)110 ucp_test::create_entity(bool add_in_front, const ucp_test_param &test_param) {
111 entity *e = new entity(test_param, m_ucp_config, get_worker_params(), this);
112 if (add_in_front) {
113 m_entities.push_front(e);
114 } else {
115 m_entities.push_back(e);
116 }
117 return e;
118 }
119
get_ctx_params()120 ucp_params_t ucp_test::get_ctx_params() {
121 ucp_params_t params;
122 memset(¶ms, 0, sizeof(params));
123 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
124 return params;
125 }
126
get_worker_params()127 ucp_worker_params_t ucp_test::get_worker_params() {
128 ucp_worker_params_t params;
129 memset(¶ms, 0, sizeof(params));
130 params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
131 params.thread_mode = UCS_THREAD_MODE_MULTI;
132 return params;
133 }
134
get_ep_params()135 ucp_ep_params_t ucp_test::get_ep_params() {
136 ucp_ep_params_t params;
137 memset(¶ms, 0, sizeof(params));
138 return params;
139 }
140
progress(int worker_index) const141 unsigned ucp_test::progress(int worker_index) const {
142 unsigned count = 0;
143 for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
144 iter != entities().end(); ++iter)
145 {
146 count += (*iter)->progress(worker_index);
147 sched_yield();
148 }
149 return count;
150 }
151
short_progress_loop(int worker_index) const152 void ucp_test::short_progress_loop(int worker_index) const {
153 for (unsigned i = 0; i < 100; ++i) {
154 progress(worker_index);
155 usleep(100);
156 }
157 }
158
flush_ep(const entity & e,int worker_index,int ep_index)159 void ucp_test::flush_ep(const entity &e, int worker_index, int ep_index)
160 {
161 void *request = e.flush_ep_nb(worker_index, ep_index);
162 wait(request, worker_index);
163 }
164
flush_worker(const entity & e,int worker_index)165 void ucp_test::flush_worker(const entity &e, int worker_index)
166 {
167 void *request = e.flush_worker_nb(worker_index);
168 wait(request, worker_index);
169 }
170
disconnect(entity & e)171 void ucp_test::disconnect(entity& e) {
172 bool has_failed_entity = false;
173 for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
174 !has_failed_entity && (iter != entities().end()); ++iter) {
175 has_failed_entity = ((*iter)->get_err_num() > 0);
176 }
177
178 for (int i = 0; i < e.get_num_workers(); i++) {
179 enum ucp_ep_close_mode close_mode;
180
181 if (has_failed_entity) {
182 close_mode = UCP_EP_CLOSE_MODE_FORCE;
183 } else {
184 flush_worker(e, i);
185 close_mode = UCP_EP_CLOSE_MODE_FLUSH;
186 }
187
188 e.close_all_eps(*this, i, close_mode);
189 }
190 }
191
wait(void * req,int worker_index)192 void ucp_test::wait(void *req, int worker_index)
193 {
194 if (req == NULL) {
195 return;
196 }
197
198 if (UCS_PTR_IS_ERR(req)) {
199 ucs_error("operation returned error: %s",
200 ucs_status_string(UCS_PTR_STATUS(req)));
201 return;
202 }
203
204 ucs_status_t status;
205 ucs_time_t deadline = ucs::get_deadline();
206 do {
207 progress(worker_index);
208 status = ucp_request_check_status(req);
209 } while ((status == UCS_INPROGRESS) && (ucs_get_time() < deadline));
210
211 if (status != UCS_OK) {
212 /* UCS errors are suppressed in case of error handling tests */
213 ucs_error("request %p completed with error %s", req,
214 ucs_status_string(status));
215 }
216
217 ucp_request_release(req);
218 }
219
set_ucp_config(ucp_config_t * config)220 void ucp_test::set_ucp_config(ucp_config_t *config) {
221 set_ucp_config(config, GetParam());
222 }
223
max_connections()224 int ucp_test::max_connections() {
225 if (has_transport("tcp")) {
226 return ucs::max_tcp_connections();
227 } else {
228 return std::numeric_limits<int>::max();
229 }
230 }
231
232 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)233 ucp_test::enum_test_params(const ucp_params_t& ctx_params,
234 const std::string& name,
235 const std::string& test_case_name,
236 const std::string& tls)
237 {
238 ucp_test_param test_param;
239 std::stringstream ss(tls);
240
241 test_param.ctx_params = ctx_params;
242 test_param.variant = DEFAULT_PARAM_VARIANT;
243 test_param.thread_type = SINGLE_THREAD;
244
245 while (ss.good()) {
246 std::string tl_name;
247 std::getline(ss, tl_name, ',');
248 test_param.transports.push_back(tl_name);
249 }
250
251 if (check_test_param(name, test_case_name, test_param)) {
252 return std::vector<ucp_test_param>(1, test_param);
253 } else {
254 return std::vector<ucp_test_param>();
255 }
256 }
257
generate_test_params_variant(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls,int variant,std::vector<ucp_test_param> & test_params,int thread_type)258 void ucp_test::generate_test_params_variant(const ucp_params_t& ctx_params,
259 const std::string& name,
260 const std::string& test_case_name,
261 const std::string& tls,
262 int variant,
263 std::vector<ucp_test_param>& test_params,
264 int thread_type)
265 {
266 std::vector<ucp_test_param> tmp_test_params;
267
268 tmp_test_params = ucp_test::enum_test_params(ctx_params,name,
269 test_case_name, tls);
270 for (std::vector<ucp_test_param>::iterator iter = tmp_test_params.begin();
271 iter != tmp_test_params.end(); ++iter)
272 {
273 iter->variant = variant;
274 iter->thread_type = thread_type;
275 test_params.push_back(*iter);
276 }
277 }
278
set_ucp_config(ucp_config_t * config,const ucp_test_param & test_param)279 void ucp_test::set_ucp_config(ucp_config_t *config,
280 const ucp_test_param& test_param)
281 {
282 std::stringstream ss;
283 ss << test_param;
284 ucp_config_modify(config, "TLS", ss.str().c_str());
285 /* prevent configuration warnings in the UCP testing */
286 ucp_config_modify(config, "WARN_INVALID_CONFIG", "no");
287 }
288
modify_config(const std::string & name,const std::string & value,bool optional)289 void ucp_test::modify_config(const std::string& name, const std::string& value,
290 bool optional)
291 {
292 ucs_status_t status;
293
294 status = ucp_config_modify(m_ucp_config, name.c_str(), value.c_str());
295 if (status == UCS_ERR_NO_ELEM) {
296 test_base::modify_config(name, value, optional);
297 } else if (status != UCS_OK) {
298 UCS_TEST_ABORT("Couldn't modify ucp config parameter: " <<
299 name.c_str() << " to " << value.c_str() << ": " <<
300 ucs_status_string(status));
301 }
302 }
303
stats_activate()304 void ucp_test::stats_activate()
305 {
306 ucs_stats_cleanup();
307 push_config();
308 modify_config("STATS_DEST", "file:/dev/null");
309 modify_config("STATS_TRIGGER", "exit");
310 ucs_stats_init();
311 ASSERT_TRUE(ucs_stats_is_active());
312 }
313
stats_restore()314 void ucp_test::stats_restore()
315 {
316 ucs_stats_cleanup();
317 pop_config();
318 ucs_stats_init();
319 }
320
321
check_test_param(const std::string & name,const std::string & test_case_name,const ucp_test_param & test_param)322 bool ucp_test::check_test_param(const std::string& name,
323 const std::string& test_case_name,
324 const ucp_test_param& test_param)
325 {
326 typedef std::map<std::string, bool> cache_t;
327 static cache_t cache;
328
329 if (test_param.transports.empty()) {
330 return false;
331 }
332
333 cache_t::iterator iter = cache.find(name);
334 if (iter != cache.end()) {
335 return iter->second;
336 }
337
338 ucs::handle<ucp_config_t*> config;
339 UCS_TEST_CREATE_HANDLE(ucp_config_t*, config, ucp_config_release,
340 ucp_config_read, NULL, NULL);
341 set_ucp_config(config, test_param);
342
343 ucp_context_h ucph;
344 ucs_status_t status;
345 {
346 scoped_log_handler slh(hide_errors_logger);
347 status = ucp_init(&test_param.ctx_params, config, &ucph);
348 }
349
350 bool result;
351 if (status == UCS_OK) {
352 ucp_cleanup(ucph);
353 result = true;
354 } else if (status == UCS_ERR_NO_DEVICE) {
355 result = false;
356 } else {
357 UCS_TEST_ABORT("Failed to create context (" << test_case_name << "): "
358 << ucs_status_string(status));
359 }
360
361 UCS_TEST_MESSAGE << "checking " << name << ": " << (result ? "yes" : "no");
362 cache[name] = result;
363 return result;
364 }
365
entity(const ucp_test_param & test_param,ucp_config_t * ucp_config,const ucp_worker_params_t & worker_params,const ucp_test_base * test_owner)366 ucp_test_base::entity::entity(const ucp_test_param& test_param,
367 ucp_config_t* ucp_config,
368 const ucp_worker_params_t& worker_params,
369 const ucp_test_base *test_owner)
370 : m_err_cntr(0), m_rejected_cntr(0)
371 {
372 ucp_test_param entity_param = test_param;
373 ucp_worker_params_t local_worker_params = worker_params;
374 int num_workers;
375
376 if (test_param.thread_type == MULTI_THREAD_CONTEXT) {
377 num_workers = MT_TEST_NUM_THREADS;
378 entity_param.ctx_params.mt_workers_shared = 1;
379 local_worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
380 } else if (test_param.thread_type == MULTI_THREAD_WORKER) {
381 num_workers = 1;
382 entity_param.ctx_params.mt_workers_shared = 0;
383 local_worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
384 } else {
385 num_workers = 1;
386 entity_param.ctx_params.mt_workers_shared = 0;
387 local_worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
388 }
389
390 entity_param.ctx_params.field_mask |= UCP_PARAM_FIELD_MT_WORKERS_SHARED;
391 local_worker_params.field_mask |= UCP_WORKER_PARAM_FIELD_THREAD_MODE;
392
393 ucp_test::set_ucp_config(ucp_config, entity_param);
394
395 {
396 scoped_log_handler slh(hide_errors_logger);
397 UCS_TEST_CREATE_HANDLE_IF_SUPPORTED(ucp_context_h, m_ucph, ucp_cleanup,
398 ucp_init, &entity_param.ctx_params,
399 ucp_config);
400 }
401
402 m_workers.resize(num_workers);
403 for (int i = 0; i < num_workers; i++) {
404 UCS_TEST_CREATE_HANDLE(ucp_worker_h, m_workers[i].first,
405 ucp_worker_destroy, ucp_worker_create, m_ucph,
406 &local_worker_params);
407 }
408 }
409
~entity()410 ucp_test_base::entity::~entity() {
411 cleanup();
412 }
413
connect(const entity * other,const ucp_ep_params_t & ep_params,int ep_idx,int do_set_ep)414 void ucp_test_base::entity::connect(const entity* other,
415 const ucp_ep_params_t& ep_params,
416 int ep_idx, int do_set_ep) {
417 assert(get_num_workers() == other->get_num_workers());
418 for (unsigned i = 0; i < unsigned(get_num_workers()); i++) {
419 ucs_status_t status;
420 ucp_address_t *address;
421 size_t address_length;
422 ucp_ep_h ep;
423
424 status = ucp_worker_get_address(other->worker(i), &address, &address_length);
425 ASSERT_UCS_OK(status);
426
427 {
428 scoped_log_handler slh(hide_errors_logger);
429
430 ucp_ep_params_t local_ep_params = ep_params;
431 local_ep_params.field_mask |= UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
432 local_ep_params.address = address;
433
434 status = ucp_ep_create(m_workers[i].first, &local_ep_params, &ep);
435 }
436
437 if (status == UCS_ERR_UNREACHABLE) {
438 ucp_worker_release_address(other->worker(i), address);
439 UCS_TEST_SKIP_R(m_errors.empty() ? "Unreachable" : m_errors.back());
440 }
441
442 ASSERT_UCS_OK(status, << " (" << m_errors.back() << ")");
443
444 if (do_set_ep) {
445 set_ep(ep, i, ep_idx);
446 }
447
448 ucp_worker_release_address(other->worker(i), address);
449 }
450 }
451
452 /*
453 * Checks if the client's address matches any IP address on the server's side.
454 */
verify_client_address(struct sockaddr_storage * client_address)455 bool ucp_test_base::entity::verify_client_address(struct sockaddr_storage
456 *client_address)
457 {
458 struct ifaddrs* ifaddrs;
459
460 if (getifaddrs(&ifaddrs) != 0) {
461 return false;
462 }
463
464 for (struct ifaddrs *ifa = ifaddrs; ifa != NULL; ifa = ifa->ifa_next) {
465 if (ucs_netif_flags_is_active(ifa->ifa_flags) &&
466 ucs::is_inet_addr(ifa->ifa_addr))
467 {
468 if (!ucs_sockaddr_ip_cmp((const struct sockaddr*)client_address,
469 ifa->ifa_addr)) {
470 freeifaddrs(ifaddrs);
471 return true;
472 }
473 }
474 }
475
476 freeifaddrs(ifaddrs);
477 return false;
478 }
479
accept(ucp_worker_h worker,ucp_conn_request_h conn_request)480 ucp_ep_h ucp_test_base::entity::accept(ucp_worker_h worker,
481 ucp_conn_request_h conn_request)
482 {
483 ucp_ep_params_t ep_params = *m_server_ep_params;
484 ucp_conn_request_attr_t attr;
485 ucs_status_t status;
486 ucp_ep_h ep;
487
488 attr.field_mask = UCP_CONN_REQUEST_ATTR_FIELD_CLIENT_ADDR;
489 status = ucp_conn_request_query(conn_request, &attr);
490 EXPECT_TRUE((status == UCS_OK) || (status == UCS_ERR_UNSUPPORTED));
491 if (status == UCS_OK) {
492 EXPECT_TRUE(verify_client_address(&attr.client_address));
493 }
494
495 ep_params.field_mask |= UCP_EP_PARAM_FIELD_CONN_REQUEST |
496 UCP_EP_PARAM_FIELD_USER_DATA;
497 ep_params.user_data = reinterpret_cast<void*>(this);
498 ep_params.conn_request = conn_request;
499
500 status = ucp_ep_create(worker, &ep_params, &ep);
501 if (status == UCS_ERR_UNREACHABLE) {
502 UCS_TEST_SKIP_R("Skipping due an unreachable destination (unsupported "
503 "feature or no supported transport to send partial "
504 "worker address)");
505 }
506 ASSERT_UCS_OK(status);
507 return ep;
508 }
509
510
modify_ep(const ucp_ep_params_t & ep_params,int worker_idx,int ep_idx)511 void* ucp_test_base::entity::modify_ep(const ucp_ep_params_t& ep_params,
512 int worker_idx, int ep_idx) {
513 return ucp_ep_modify_nb(ep(worker_idx, ep_idx), &ep_params);
514 }
515
516
set_ep(ucp_ep_h ep,int worker_index,int ep_index)517 void ucp_test_base::entity::set_ep(ucp_ep_h ep, int worker_index, int ep_index)
518 {
519 if (ep_index < get_num_eps(worker_index)) {
520 m_workers[worker_index].second[ep_index].reset(ep, ep_destructor, this);
521 } else {
522 m_workers[worker_index].second.push_back(
523 ucs::handle<ucp_ep_h, entity *>(ep, ucp_ep_destroy));
524 }
525 }
526
empty_send_completion(void * r,ucs_status_t status)527 void ucp_test_base::entity::empty_send_completion(void *r, ucs_status_t status) {
528 }
529
accept_ep_cb(ucp_ep_h ep,void * arg)530 void ucp_test_base::entity::accept_ep_cb(ucp_ep_h ep, void *arg) {
531 entity *self = reinterpret_cast<entity*>(arg);
532 int worker_index = 0; /* TODO pass worker index in arg */
533
534 /* take error handler from test fixture and add user data */
535 ucp_ep_params_t ep_params = *self->m_server_ep_params;
536 ep_params.field_mask &= UCP_EP_PARAM_FIELD_ERR_HANDLER;
537 ep_params.field_mask |= UCP_EP_PARAM_FIELD_USER_DATA;
538 ep_params.user_data = reinterpret_cast<void*>(self);
539
540 void *req = ucp_ep_modify_nb(ep, &ep_params);
541 ASSERT_UCS_PTR_OK(req); /* don't expect this operation to block */
542
543 self->set_ep(ep, worker_index, self->get_num_eps(worker_index));
544 }
545
accept_conn_cb(ucp_conn_request_h conn_req,void * arg)546 void ucp_test_base::entity::accept_conn_cb(ucp_conn_request_h conn_req, void* arg)
547 {
548 entity *self = reinterpret_cast<entity*>(arg);
549 self->m_conn_reqs.push(conn_req);
550 }
551
reject_conn_cb(ucp_conn_request_h conn_req,void * arg)552 void ucp_test_base::entity::reject_conn_cb(ucp_conn_request_h conn_req, void* arg)
553 {
554 entity *self = reinterpret_cast<entity*>(arg);
555 ucp_listener_reject(self->m_listener, conn_req);
556 self->m_rejected_cntr++;
557 }
558
flush_ep_nb(int worker_index,int ep_index) const559 void* ucp_test_base::entity::flush_ep_nb(int worker_index, int ep_index) const {
560 return ucp_ep_flush_nb(ep(worker_index, ep_index), 0, empty_send_completion);
561 }
562
flush_worker_nb(int worker_index) const563 void* ucp_test_base::entity::flush_worker_nb(int worker_index) const {
564 if (worker(worker_index) == NULL) {
565 return NULL;
566 }
567 return ucp_worker_flush_nb(worker(worker_index), 0, empty_send_completion);
568 }
569
fence(int worker_index) const570 void ucp_test_base::entity::fence(int worker_index) const {
571 ucs_status_t status = ucp_worker_fence(worker(worker_index));
572 ASSERT_UCS_OK(status);
573 }
574
disconnect_nb(int worker_index,int ep_index,enum ucp_ep_close_mode mode)575 void *ucp_test_base::entity::disconnect_nb(int worker_index, int ep_index,
576 enum ucp_ep_close_mode mode) {
577 ucp_ep_h ep = revoke_ep(worker_index, ep_index);
578 if (ep == NULL) {
579 return NULL;
580 }
581
582 void *req = ucp_ep_close_nb(ep, mode);
583 if (UCS_PTR_IS_PTR(req)) {
584 m_close_ep_reqs.push_back(req);
585 return req;
586 }
587
588 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
589 return NULL;
590 }
591
close_ep_req_free(void * close_req)592 void ucp_test_base::entity::close_ep_req_free(void *close_req) {
593 if (close_req == NULL) {
594 return;
595 }
596
597 ucs_status_t status = UCS_PTR_IS_ERR(close_req) ? UCS_PTR_STATUS(close_req) :
598 ucp_request_check_status(close_req);
599 ASSERT_NE(UCS_INPROGRESS, status) << "free not completed EP close request";
600 if (status != UCS_OK) {
601 UCS_TEST_MESSAGE << "ucp_ep_close_nb completed with status "
602 << ucs_status_string(status);
603 }
604
605 m_close_ep_reqs.erase(std::find(m_close_ep_reqs.begin(),
606 m_close_ep_reqs.end(), close_req));
607 ucp_request_free(close_req);
608 }
609
close_all_eps(const ucp_test & test,int worker_idx,enum ucp_ep_close_mode mode)610 void ucp_test_base::entity::close_all_eps(const ucp_test &test, int worker_idx,
611 enum ucp_ep_close_mode mode) {
612 for (int j = 0; j < get_num_eps(worker_idx); j++) {
613 disconnect_nb(worker_idx, j, mode);
614 }
615
616 ucs_time_t deadline = ucs::get_deadline();
617 while (!m_close_ep_reqs.empty() && (ucs_get_time() < deadline)) {
618 void *req = m_close_ep_reqs.front();
619 while (!is_request_completed(req)) {
620 test.progress(worker_idx);
621 }
622
623 close_ep_req_free(req);
624 }
625
626 EXPECT_TRUE(m_close_ep_reqs.empty()) << m_close_ep_reqs.size()
627 << " endpoints were not closed";
628 }
629
destroy_worker(int worker_index)630 void ucp_test_base::entity::destroy_worker(int worker_index) {
631 for (size_t i = 0; i < m_workers[worker_index].second.size(); ++i) {
632 m_workers[worker_index].second[i].revoke();
633 }
634 m_workers[worker_index].first.reset();
635 }
636
ep(int worker_index,int ep_index) const637 ucp_ep_h ucp_test_base::entity::ep(int worker_index, int ep_index) const {
638 if (size_t(worker_index) < m_workers.size()) {
639 if (size_t(ep_index) < m_workers[worker_index].second.size()) {
640 return m_workers[worker_index].second[ep_index];
641 }
642 }
643 return NULL;
644 }
645
revoke_ep(int worker_index,int ep_index) const646 ucp_ep_h ucp_test_base::entity::revoke_ep(int worker_index, int ep_index) const {
647 ucp_ep_h ucp_ep = ep(worker_index, ep_index);
648
649 if (ucp_ep) {
650 m_workers[worker_index].second[ep_index].revoke();
651 }
652
653 return ucp_ep;
654 }
655
listen(listen_cb_type_t cb_type,const struct sockaddr * saddr,socklen_t addrlen,const ucp_ep_params_t & ep_params,int worker_index)656 ucs_status_t ucp_test_base::entity::listen(listen_cb_type_t cb_type,
657 const struct sockaddr* saddr,
658 socklen_t addrlen,
659 const ucp_ep_params_t& ep_params,
660 int worker_index)
661 {
662 ucp_listener_params_t params;
663 ucp_listener_h listener;
664
665 params.field_mask = UCP_LISTENER_PARAM_FIELD_SOCK_ADDR;
666 params.sockaddr.addr = saddr;
667 params.sockaddr.addrlen = addrlen;
668
669 switch (cb_type) {
670 case LISTEN_CB_EP:
671 params.field_mask |= UCP_LISTENER_PARAM_FIELD_ACCEPT_HANDLER;
672 params.accept_handler.cb = accept_ep_cb;
673 params.accept_handler.arg = reinterpret_cast<void*>(this);
674 break;
675 case LISTEN_CB_CONN:
676 params.field_mask |= UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
677 params.conn_handler.cb = accept_conn_cb;
678 params.conn_handler.arg = reinterpret_cast<void*>(this);
679 break;
680 case LISTEN_CB_REJECT:
681 params.field_mask |= UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
682 params.conn_handler.cb = reject_conn_cb;
683 params.conn_handler.arg = reinterpret_cast<void*>(this);
684 break;
685 default:
686 UCS_TEST_ABORT("invalid test parameter");
687 }
688
689 m_server_ep_params.reset(new ucp_ep_params_t(ep_params),
690 ucs::deleter<ucp_ep_params_t>);
691
692 ucs_status_t status;
693 {
694 scoped_log_handler wrap_err(wrap_errors_logger);
695 status = ucp_listener_create(worker(worker_index), ¶ms, &listener);
696 }
697
698 if (status == UCS_OK) {
699 m_listener.reset(listener, ucp_listener_destroy);
700 } else {
701 /* throw error if status is not (UCS_OK or UCS_ERR_UNREACHABLE or
702 * UCS_ERR_BUSY).
703 * UCS_ERR_INVALID_PARAM may also return but then the test should fail */
704 EXPECT_TRUE((status == UCS_ERR_UNREACHABLE) ||
705 (status == UCS_ERR_BUSY)) << ucs_status_string(status);
706 }
707
708 return status;
709 }
710
worker(int worker_index) const711 ucp_worker_h ucp_test_base::entity::worker(int worker_index) const {
712 if (worker_index < get_num_workers()) {
713 return m_workers[worker_index].first;
714 } else {
715 return NULL;
716 }
717 }
718
ucph() const719 ucp_context_h ucp_test_base::entity::ucph() const {
720 return m_ucph;
721 }
722
listenerh() const723 ucp_listener_h ucp_test_base::entity::listenerh() const {
724 return m_listener;
725 }
726
progress(int worker_index)727 unsigned ucp_test_base::entity::progress(int worker_index)
728 {
729 ucp_worker_h ucp_worker = worker(worker_index);
730
731 if (ucp_worker == NULL) {
732 return 0;
733 }
734
735 unsigned progress_count = 0;
736 if (!m_conn_reqs.empty()) {
737 ucp_conn_request_h conn_req = m_conn_reqs.back();
738 m_conn_reqs.pop();
739 ucp_ep_h ep = accept(ucp_worker, conn_req);
740 set_ep(ep, worker_index, std::numeric_limits<int>::max());
741 ++progress_count;
742 }
743
744 return progress_count + ucp_worker_progress(ucp_worker);
745 }
746
get_num_workers() const747 int ucp_test_base::entity::get_num_workers() const {
748 return m_workers.size();
749 }
750
get_num_eps(int worker_index) const751 int ucp_test_base::entity::get_num_eps(int worker_index) const {
752 return m_workers[worker_index].second.size();
753 }
754
add_err(ucs_status_t status)755 void ucp_test_base::entity::add_err(ucs_status_t status) {
756 switch (status) {
757 case UCS_ERR_REJECTED:
758 ++m_rejected_cntr;
759 /* fall through */
760 default:
761 ++m_err_cntr;
762 }
763
764 EXPECT_EQ(1ul, m_err_cntr) << "error callback is called more than once";
765 }
766
get_err_num_rejected() const767 const size_t &ucp_test_base::entity::get_err_num_rejected() const {
768 return m_rejected_cntr;
769 }
770
get_err_num() const771 const size_t &ucp_test_base::entity::get_err_num() const {
772 return m_err_cntr;
773 }
774
warn_existing_eps() const775 void ucp_test_base::entity::warn_existing_eps() const {
776 for (size_t worker_index = 0; worker_index < m_workers.size(); ++worker_index) {
777 for (size_t ep_index = 0; ep_index < m_workers[worker_index].second.size();
778 ++ep_index) {
779 ADD_FAILURE() << "ep(" << worker_index << "," << ep_index <<
780 ")=" << m_workers[worker_index].second[ep_index].get() <<
781 " was not destroyed during test cleanup()";
782 }
783 }
784 }
785
set_ib_ud_timeout(double timeout_sec)786 double ucp_test_base::entity::set_ib_ud_timeout(double timeout_sec)
787 {
788 double prev_timeout_sec = 0.;
789 #if HAVE_IB
790 for (ucp_rsc_index_t rsc_index = 0;
791 rsc_index < ucph()->num_tls; ++rsc_index) {
792 ucp_worker_iface_t *wiface = ucp_worker_iface(worker(), rsc_index);
793 // check if the iface is ud transport
794 if (wiface->iface->ops.iface_flush == uct_ud_iface_flush) {
795 uct_ud_iface_t *iface =
796 ucs_derived_of(wiface->iface, uct_ud_iface_t);
797
798 uct_ud_enter(iface);
799 if (!prev_timeout_sec) {
800 prev_timeout_sec = ucs_time_to_sec(iface->config.peer_timeout);
801 }
802
803 iface->config.peer_timeout = ucs_time_from_sec(timeout_sec);
804 uct_ud_leave(iface);
805 }
806 }
807 #endif
808 return prev_timeout_sec;
809 }
810
cleanup()811 void ucp_test_base::entity::cleanup() {
812 m_listener.reset();
813 m_workers.clear();
814 }
815
ep_destructor(ucp_ep_h ep,entity * e)816 void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e)
817 {
818 ucs_status_ptr_t req = ucp_disconnect_nb(ep);
819 if (!UCS_PTR_IS_PTR(req)) {
820 return;
821 }
822
823 ucs_status_t status;
824 ucp_tag_recv_info_t info;
825 do {
826 e->progress();
827 status = ucp_request_test(req, &info);
828 } while (status == UCS_INPROGRESS);
829 EXPECT_EQ(UCS_OK, status);
830 ucp_request_release(req);
831 }
832
is_request_completed(void * request)833 bool ucp_test_base::is_request_completed(void *request) {
834 return (request == NULL) ||
835 (ucp_request_check_status(request) != UCS_INPROGRESS);
836 }
837
mapped_buffer(size_t size,const entity & entity,int flags,ucs_memory_type_t mem_type)838 ucp_test::mapped_buffer::mapped_buffer(size_t size, const entity& entity,
839 int flags, ucs_memory_type_t mem_type) :
840 mem_buffer(size, mem_type), m_entity(entity), m_memh(NULL),
841 m_rkey_buffer(NULL)
842 {
843 ucs_status_t status;
844
845 if (flags & (UCP_MEM_MAP_ALLOCATE|UCP_MEM_MAP_FIXED)) {
846 UCS_TEST_ABORT("mapped_buffer does not support allocation by UCP");
847 }
848
849 ucp_mem_map_params_t params;
850 params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
851 UCP_MEM_MAP_PARAM_FIELD_LENGTH |
852 UCP_MEM_MAP_PARAM_FIELD_FLAGS;
853 params.flags = flags;
854 params.address = ptr();
855 params.length = size;
856
857 status = ucp_mem_map(m_entity.ucph(), ¶ms, &m_memh);
858 ASSERT_UCS_OK(status);
859
860 size_t rkey_buffer_size;
861 status = ucp_rkey_pack(m_entity.ucph(), m_memh, &m_rkey_buffer,
862 &rkey_buffer_size);
863 ASSERT_UCS_OK(status);
864 }
865
~mapped_buffer()866 ucp_test::mapped_buffer::~mapped_buffer()
867 {
868 ucp_rkey_buffer_release(m_rkey_buffer);
869 ucs_status_t status = ucp_mem_unmap(m_entity.ucph(), m_memh);
870 EXPECT_UCS_OK(status);
871 }
872
rkey(const entity & entity) const873 ucs::handle<ucp_rkey_h> ucp_test::mapped_buffer::rkey(const entity& entity) const
874 {
875 ucp_rkey_h rkey;
876
877 ucs_status_t status = ucp_ep_rkey_unpack(entity.ep(), m_rkey_buffer, &rkey);
878 ASSERT_UCS_OK(status);
879 return ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy);
880 }
881
memh() const882 ucp_mem_h ucp_test::mapped_buffer::memh() const
883 {
884 return m_memh;
885 }
886