1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "test_ucp_tag.h"
8 #include "ucp_datatype.h"
9
10 extern "C" {
11 #include <ucp/core/ucp_ep.inl> /* for testing EP RNDV configuration */
12 #include <ucp/core/ucp_request.h> /* for debug */
13 #include <ucp/core/ucp_worker.h> /* for testing memory consumption */
14 }
15
16 class test_ucp_peer_failure : public ucp_test {
17 public:
18 test_ucp_peer_failure();
19
20 static std::vector<ucp_test_param>
21 enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
22 const std::string& test_case_name, const std::string& tls);
23
24 ucp_ep_params_t get_ep_params();
25
26 protected:
27 enum {
28 TEST_TAG = UCS_BIT(0),
29 TEST_RMA = UCS_BIT(1),
30 FAIL_IMM = UCS_BIT(2)
31 };
32
33 enum {
34 STABLE_EP_INDEX,
35 FAILING_EP_INDEX
36 };
37
38 typedef ucs::handle<ucp_mem_h, ucp_context_h> mem_handle_t;
39
40 void set_timeouts();
41 static void err_cb(void *arg, ucp_ep_h ep, ucs_status_t status);
42 ucp_ep_h stable_sender();
43 ucp_ep_h failing_sender();
44 entity& stable_receiver();
45 entity& failing_receiver();
46 void *send_nb(ucp_ep_h ep, ucp_rkey_h rkey);
47 void *recv_nb(entity& e);
48 static ucs_log_func_rc_t
49 warn_unreleased_rdesc_handler(const char *file, unsigned line,
50 const char *function,
51 ucs_log_level_t level,
52 const ucs_log_component_config_t *comp_conf,
53 const char *message, va_list ap);
54 void fail_receiver();
55 void smoke_test(bool stable_pair);
56 static void unmap_memh(ucp_mem_h memh, ucp_context_h context);
57 void get_rkey(ucp_ep_h ep, entity& dst, mem_handle_t& memh,
58 ucs::handle<ucp_rkey_h>& rkey);
59 void set_rkeys();
60 static void send_cb(void *request, ucs_status_t status);
61 static void recv_cb(void *request, ucs_status_t status,
62 ucp_tag_recv_info_t *info);
63
64 virtual void cleanup();
65
66 void do_test(size_t msg_size, int pre_msg_count, bool force_close,
67 bool request_must_fail);
68
69 size_t m_err_count;
70 ucs_status_t m_err_status;
71 std::string m_sbuf, m_rbuf;
72 mem_handle_t m_stable_memh, m_failing_memh;
73 ucs::handle<ucp_rkey_h> m_stable_rkey, m_failing_rkey;
74 ucs::ptr_vector<ucs::scoped_setenv> m_env;
75 };
76
UCP_INSTANTIATE_TEST_CASE(test_ucp_peer_failure)77 UCP_INSTANTIATE_TEST_CASE(test_ucp_peer_failure)
78
79
80 test_ucp_peer_failure::test_ucp_peer_failure() : m_err_count(0), m_err_status(UCS_OK) {
81 ucs::fill_random(m_sbuf);
82 set_timeouts();
83 }
84
85 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)86 test_ucp_peer_failure::enum_test_params(const ucp_params_t& ctx_params,
87 const std::string& name,
88 const std::string& test_case_name,
89 const std::string& tls)
90 {
91 std::vector<ucp_test_param> result;
92
93 ucp_params_t params = ucp_test::get_ctx_params();
94
95 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
96
97 params.features = UCP_FEATURE_TAG;
98 generate_test_params_variant(params, name, test_case_name + "/tag", tls,
99 TEST_TAG, result);
100 generate_test_params_variant(params, name, test_case_name + "/tag_fail_imm",
101 tls, TEST_TAG | FAIL_IMM, result);
102
103 params.features = UCP_FEATURE_RMA;
104 generate_test_params_variant(params, name, test_case_name + "/rma", tls,
105 TEST_RMA, result);
106 generate_test_params_variant(params, name, test_case_name + "/rma_fail_imm",
107 tls, TEST_RMA | FAIL_IMM, result);
108
109 return result;
110 }
111
get_ep_params()112 ucp_ep_params_t test_ucp_peer_failure::get_ep_params() {
113 ucp_ep_params_t params;
114 memset(¶ms, 0, sizeof(params));
115 params.field_mask = UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
116 UCP_EP_PARAM_FIELD_ERR_HANDLER;
117 params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
118 params.err_handler.cb = err_cb;
119 params.err_handler.arg = reinterpret_cast<void*>(this);
120 return params;
121 }
122
set_timeouts()123 void test_ucp_peer_failure::set_timeouts() {
124 /* Set small TL timeouts to reduce testing time */
125 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TIMEOUT", "10ms"));
126 m_env.push_back(new ucs::scoped_setenv("UCX_RC_RNR_TIMEOUT", "10ms"));
127 m_env.push_back(new ucs::scoped_setenv("UCX_RC_RETRY_COUNT", "2"));
128 }
129
err_cb(void * arg,ucp_ep_h ep,ucs_status_t status)130 void test_ucp_peer_failure::err_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
131 test_ucp_peer_failure *self = reinterpret_cast<test_ucp_peer_failure*>(arg);
132 EXPECT_EQ(UCS_ERR_ENDPOINT_TIMEOUT, status);
133 self->m_err_status = status;
134 ++self->m_err_count;
135 }
136
stable_sender()137 ucp_ep_h test_ucp_peer_failure::stable_sender() {
138 return sender().ep(0, STABLE_EP_INDEX);
139 }
140
failing_sender()141 ucp_ep_h test_ucp_peer_failure::failing_sender() {
142 return sender().ep(0, FAILING_EP_INDEX);
143 }
144
stable_receiver()145 ucp_test::entity& test_ucp_peer_failure::stable_receiver() {
146 return m_entities.at(m_entities.size() - 2);
147 }
148
failing_receiver()149 ucp_test::entity& test_ucp_peer_failure::failing_receiver() {
150 return m_entities.at(m_entities.size() - 1);
151 }
152
send_nb(ucp_ep_h ep,ucp_rkey_h rkey)153 void *test_ucp_peer_failure::send_nb(ucp_ep_h ep, ucp_rkey_h rkey) {
154 if (GetParam().variant & TEST_TAG) {
155 return ucp_tag_send_nb(ep, &m_sbuf[0], m_sbuf.size(), DATATYPE, 0,
156 send_cb);
157 } else if (GetParam().variant & TEST_RMA) {
158 return ucp_put_nb(ep, &m_sbuf[0], m_sbuf.size(), (uintptr_t)&m_rbuf[0],
159 rkey, send_cb);
160 } else {
161 ucs_fatal("invalid test case");
162 }
163 }
164
recv_nb(entity & e)165 void *test_ucp_peer_failure::recv_nb(entity& e) {
166 ucs_assert(m_rbuf.size() >= m_sbuf.size());
167 if (GetParam().variant & TEST_TAG) {
168 return ucp_tag_recv_nb(e.worker(), &m_rbuf[0], m_rbuf.size(), DATATYPE, 0,
169 0, recv_cb);
170 } else if (GetParam().variant & TEST_RMA) {
171 return NULL;
172 } else {
173 ucs_fatal("invalid test case");
174 }
175 }
176
177 ucs_log_func_rc_t
warn_unreleased_rdesc_handler(const char * file,unsigned line,const char * function,ucs_log_level_t level,const ucs_log_component_config_t * comp_conf,const char * message,va_list ap)178 test_ucp_peer_failure::warn_unreleased_rdesc_handler(const char *file, unsigned line,
179 const char *function,
180 ucs_log_level_t level,
181 const ucs_log_component_config_t *comp_conf,
182 const char *message, va_list ap)
183 {
184 if (level == UCS_LOG_LEVEL_WARN) {
185 std::string err_str = format_message(message, ap);
186
187 if (err_str.find("unexpected tag-receive descriptor") != std::string::npos) {
188 return UCS_LOG_FUNC_RC_STOP;
189 }
190 }
191
192 return UCS_LOG_FUNC_RC_CONTINUE;
193 }
194
fail_receiver()195 void test_ucp_peer_failure::fail_receiver() {
196 /* TODO: need to handle non-empty TX window in UD EP destructor",
197 * see debug message (ud_ep.c:220)
198 * ucs_debug("ep=%p id=%d conn_id=%d has %d unacked packets",
199 * self, self->ep_id, self->conn_id,
200 * (int)ucs_queue_length(&self->tx.window));
201 */
202 // TODO use force-close to close connections
203 flush_worker(failing_receiver());
204 m_failing_memh.reset();
205 {
206 /* transform warning messages about unreleased TM rdescs to test
207 * message that are expected here, since we closed receiver w/o
208 * reading the messages that were potentially received */
209 scoped_log_handler slh(warn_unreleased_rdesc_handler);
210 failing_receiver().cleanup();
211 }
212 }
213
smoke_test(bool stable_pair)214 void test_ucp_peer_failure::smoke_test(bool stable_pair) {
215 void *rreq = recv_nb(stable_pair ? stable_receiver() : failing_receiver());
216 void *sreq = send_nb(stable_pair ? stable_sender() : failing_sender(),
217 stable_pair ? m_stable_rkey : m_failing_rkey);
218 wait(sreq);
219 wait(rreq);
220 EXPECT_EQ(m_sbuf, m_rbuf);
221 }
222
unmap_memh(ucp_mem_h memh,ucp_context_h context)223 void test_ucp_peer_failure::unmap_memh(ucp_mem_h memh, ucp_context_h context)
224 {
225 ucs_status_t status = ucp_mem_unmap(context, memh);
226 if (status != UCS_OK) {
227 ucs_warn("failed to unmap memory: %s", ucs_status_string(status));
228 }
229 }
230
get_rkey(ucp_ep_h ep,entity & dst,mem_handle_t & memh,ucs::handle<ucp_rkey_h> & rkey)231 void test_ucp_peer_failure::get_rkey(ucp_ep_h ep, entity& dst, mem_handle_t& memh,
232 ucs::handle<ucp_rkey_h>& rkey) {
233 ucp_mem_map_params_t params;
234
235 memset(¶ms, 0, sizeof(params));
236 params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
237 UCP_MEM_MAP_PARAM_FIELD_LENGTH;
238 params.address = &m_rbuf[0];
239 params.length = m_rbuf.size();
240
241 ucp_mem_h ucp_memh;
242 ucs_status_t status = ucp_mem_map(dst.ucph(), ¶ms, &ucp_memh);
243 ASSERT_UCS_OK(status);
244 memh.reset(ucp_memh, unmap_memh, dst.ucph());
245
246 void *rkey_buffer;
247 size_t rkey_buffer_size;
248 status = ucp_rkey_pack(dst.ucph(), memh, &rkey_buffer, &rkey_buffer_size);
249 ASSERT_UCS_OK(status);
250
251 ucp_rkey_h ucp_rkey;
252 status = ucp_ep_rkey_unpack(ep, rkey_buffer, &ucp_rkey);
253 ASSERT_UCS_OK(status);
254 rkey.reset(ucp_rkey, ucp_rkey_destroy);
255
256 ucp_rkey_buffer_release(rkey_buffer);
257 }
258
set_rkeys()259 void test_ucp_peer_failure::set_rkeys() {
260
261 if (GetParam().variant & TEST_RMA) {
262 get_rkey(failing_sender(), failing_receiver(), m_failing_memh,
263 m_failing_rkey);
264 get_rkey(stable_sender(), stable_receiver(), m_stable_memh,
265 m_stable_rkey);
266 }
267 }
268
send_cb(void * request,ucs_status_t status)269 void test_ucp_peer_failure::send_cb(void *request, ucs_status_t status)
270 {
271 }
272
recv_cb(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)273 void test_ucp_peer_failure::recv_cb(void *request, ucs_status_t status,
274 ucp_tag_recv_info_t *info)
275 {
276 }
277
cleanup()278 void test_ucp_peer_failure::cleanup() {
279 m_failing_rkey.reset();
280 m_stable_rkey.reset();
281 m_failing_memh.reset();
282 m_stable_memh.reset();
283 ucp_test::cleanup();
284 }
285
do_test(size_t msg_size,int pre_msg_count,bool force_close,bool request_must_fail)286 void test_ucp_peer_failure::do_test(size_t msg_size, int pre_msg_count,
287 bool force_close, bool request_must_fail)
288 {
289 skip_loopback();
290
291 m_sbuf.resize(msg_size);
292 m_rbuf.resize(msg_size);
293
294 /* connect 2 ep's from sender() to 2 receiver entities */
295 create_entity();
296 sender().connect(&stable_receiver(), get_ep_params(), STABLE_EP_INDEX);
297 sender().connect(&failing_receiver(), get_ep_params(), FAILING_EP_INDEX);
298
299 set_rkeys();
300
301 /* Since we don't want to test peer failure on a stable pair
302 * and don't expect EP timeout error on those EPs,
303 * run traffic on a stable pair to connect it */
304 smoke_test(true);
305
306 if (!(GetParam().variant & FAIL_IMM)) {
307 /* if not fail immediately, run traffic on failing pair to connect it */
308 smoke_test(false);
309 }
310
311 /* put some sends on the failing pair */
312 std::vector<void*> sreqs_pre;
313 for (int i = 0; i < pre_msg_count; ++i) {
314 progress();
315 void *req = send_nb(failing_sender(), m_failing_rkey);
316 ASSERT_FALSE(UCS_PTR_IS_ERR(req));
317 if (UCS_PTR_IS_PTR(req)) {
318 sreqs_pre.push_back(req);
319 }
320 }
321
322 EXPECT_EQ(UCS_OK, m_err_status);
323
324 /* Since UCT/UD EP has a SW implementation of reliablity on which peer
325 * failure mechanism is based, we should set small UCT/UD EP timeout
326 * for UCT/UD EPs for sender's UCP EP to reduce testing time */
327 double prev_ib_ud_timeout = sender().set_ib_ud_timeout(3.);
328
329 {
330 scoped_log_handler slh(wrap_errors_logger);
331
332 fail_receiver();
333
334 void *sreq = send_nb(failing_sender(), m_failing_rkey);
335
336 while (!m_err_count) {
337 progress();
338 }
339 EXPECT_NE(UCS_OK, m_err_status);
340
341 if (UCS_PTR_IS_PTR(sreq)) {
342 /* The request may either succeed or fail, even though the data is
343 * not * delivered - depends on when the error is detected on sender
344 * side and if zcopy/bcopy protocol is used. In any case, the
345 * request must complete, and all resources have to be released.
346 */
347 ucs_status_t status = ucp_request_check_status(sreq);
348 EXPECT_NE(UCS_INPROGRESS, status);
349 if (request_must_fail) {
350 EXPECT_EQ(m_err_status, status);
351 } else {
352 EXPECT_TRUE((m_err_status == status) || (UCS_OK == status));
353 }
354 ucp_request_release(sreq);
355 }
356
357 /* Additional sends must fail */
358 void *sreq2 = send_nb(failing_sender(), m_failing_rkey);
359 EXPECT_FALSE(UCS_PTR_IS_PTR(sreq2));
360 EXPECT_EQ(m_err_status, UCS_PTR_STATUS(sreq2));
361
362 if (force_close) {
363 unsigned allocd_eps_before =
364 ucs_strided_alloc_inuse_count(&sender().worker()->ep_alloc);
365
366 ucp_ep_h ep = sender().revoke_ep(0, FAILING_EP_INDEX);
367
368 m_failing_rkey.reset();
369
370 void *creq = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FORCE);
371 wait(creq);
372
373 unsigned allocd_eps_after =
374 ucs_strided_alloc_inuse_count(&sender().worker()->ep_alloc);
375
376 if (!(GetParam().variant & FAIL_IMM)) {
377 EXPECT_LT(allocd_eps_after, allocd_eps_before);
378 }
379 }
380
381 /* release requests */
382 while (!sreqs_pre.empty()) {
383 void *req = sreqs_pre.back();
384 sreqs_pre.pop_back();
385 EXPECT_NE(UCS_INPROGRESS, ucp_request_test(req, NULL));
386 ucp_request_release(req);
387 }
388 }
389
390 /* Since we won't test peer failure anymore, reset UCT/UD EP timeout to the
391 * default value to avoid possible UD EP timeout errors under high load */
392 sender().set_ib_ud_timeout(prev_ib_ud_timeout);
393
394 /* Check workability of stable pair */
395 smoke_test(true);
396
397 /* Check that TX polling is working well */
398 while (sender().progress());
399
400 /* Destroy rkeys before destroying the worker (which also destroys the
401 * endpoints) */
402 m_failing_rkey.reset();
403 m_stable_rkey.reset();
404
405 /* When all requests on sender are done we need to prevent LOCAL_FLUSH
406 * in test teardown. Receiver is killed and doesn't respond on FC requests
407 */
408 sender().destroy_worker();
409 }
410
UCS_TEST_P(test_ucp_peer_failure,basic)411 UCS_TEST_P(test_ucp_peer_failure, basic) {
412 do_test(UCS_KBYTE, /* msg_size */
413 0, /* pre_msg_cnt */
414 false, /* force_close */
415 false /* must_fail */);
416 }
417
UCS_TEST_P(test_ucp_peer_failure,rndv_disable)418 UCS_TEST_P(test_ucp_peer_failure, rndv_disable) {
419 const size_t size_max = std::numeric_limits<size_t>::max();
420
421 sender().connect(&receiver(), get_ep_params(), STABLE_EP_INDEX);
422 EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv.am_thresh);
423 EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv.rma_thresh);
424 EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
425 EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
426 }
427
428 UCS_TEST_P(test_ucp_peer_failure, zcopy, "ZCOPY_THRESH=1023") {
429 do_test(UCS_KBYTE, /* msg_size */
430 0, /* pre_msg_cnt */
431 false, /* force_close */
432 true /* must_fail */);
433 }
434
435 UCS_TEST_P(test_ucp_peer_failure, bcopy_multi, "SEG_SIZE?=512", "RC_TM_ENABLE?=n") {
436 do_test(UCS_KBYTE, /* msg_size */
437 0, /* pre_msg_cnt */
438 false, /* force_close */
439 false /* must_fail */);
440 }
441
442 UCS_TEST_P(test_ucp_peer_failure, force_close, "RC_FC_ENABLE?=n") {
443 do_test(16000, /* msg_size */
444 1000, /* pre_msg_cnt */
445 true, /* force_close */
446 false /* must_fail */);
447 }
448
449 UCS_TEST_SKIP_COND_P(test_ucp_peer_failure, disable_sync_send,
450 !(GetParam().variant & TEST_TAG)) {
451 const size_t max_size = UCS_MBYTE;
452 std::vector<char> buf(max_size, 0);
453 void *req;
454
455 sender().connect(&receiver(), get_ep_params());
456
457 /* Make sure API is disabled for any size and data type */
458 for (size_t size = 1; size <= max_size; size *= 2) {
459 req = ucp_tag_send_sync_nb(sender().ep(), buf.data(), size, DATATYPE,
460 0x111337, NULL);
461 EXPECT_FALSE(UCS_PTR_IS_PTR(req));
462 EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
463
464 ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), size);
465 req = ucp_tag_send_sync_nb(sender().ep(), dt_desc.buf(), dt_desc.count(),
466 dt_desc.dt(), 0x111337, NULL);
467 EXPECT_FALSE(UCS_PTR_IS_PTR(req));
468 EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
469 }
470 }
471