1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2017.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #include "test_ucp_tag.h"
8 #include "ucp_datatype.h"
9 
10 extern "C" {
11 #include <ucp/core/ucp_ep.inl>    /* for testing EP RNDV configuration */
12 #include <ucp/core/ucp_request.h> /* for debug */
13 #include <ucp/core/ucp_worker.h>  /* for testing memory consumption */
14 }
15 
16 class test_ucp_peer_failure : public ucp_test {
17 public:
18     test_ucp_peer_failure();
19 
20     static std::vector<ucp_test_param>
21     enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
22                      const std::string& test_case_name, const std::string& tls);
23 
24     ucp_ep_params_t get_ep_params();
25 
26 protected:
27     enum {
28         TEST_TAG = UCS_BIT(0),
29         TEST_RMA = UCS_BIT(1),
30         FAIL_IMM = UCS_BIT(2)
31     };
32 
33     enum {
34         STABLE_EP_INDEX,
35         FAILING_EP_INDEX
36     };
37 
38     typedef ucs::handle<ucp_mem_h, ucp_context_h> mem_handle_t;
39 
40     void set_timeouts();
41     static void err_cb(void *arg, ucp_ep_h ep, ucs_status_t status);
42     ucp_ep_h stable_sender();
43     ucp_ep_h failing_sender();
44     entity& stable_receiver();
45     entity& failing_receiver();
46     void *send_nb(ucp_ep_h ep, ucp_rkey_h rkey);
47     void *recv_nb(entity& e);
48     static ucs_log_func_rc_t
49     warn_unreleased_rdesc_handler(const char *file, unsigned line,
50                                   const char *function,
51                                   ucs_log_level_t level,
52                                   const ucs_log_component_config_t *comp_conf,
53                                   const char *message, va_list ap);
54     void fail_receiver();
55     void smoke_test(bool stable_pair);
56     static void unmap_memh(ucp_mem_h memh, ucp_context_h context);
57     void get_rkey(ucp_ep_h ep, entity& dst, mem_handle_t& memh,
58                   ucs::handle<ucp_rkey_h>& rkey);
59     void set_rkeys();
60     static void send_cb(void *request, ucs_status_t status);
61     static void recv_cb(void *request, ucs_status_t status,
62                         ucp_tag_recv_info_t *info);
63 
64     virtual void cleanup();
65 
66     void do_test(size_t msg_size, int pre_msg_count, bool force_close,
67                  bool request_must_fail);
68 
69     size_t                              m_err_count;
70     ucs_status_t                        m_err_status;
71     std::string                         m_sbuf, m_rbuf;
72     mem_handle_t                        m_stable_memh, m_failing_memh;
73     ucs::handle<ucp_rkey_h>             m_stable_rkey, m_failing_rkey;
74     ucs::ptr_vector<ucs::scoped_setenv> m_env;
75 };
76 
UCP_INSTANTIATE_TEST_CASE(test_ucp_peer_failure)77 UCP_INSTANTIATE_TEST_CASE(test_ucp_peer_failure)
78 
79 
80 test_ucp_peer_failure::test_ucp_peer_failure() : m_err_count(0), m_err_status(UCS_OK) {
81     ucs::fill_random(m_sbuf);
82     set_timeouts();
83 }
84 
85 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)86 test_ucp_peer_failure::enum_test_params(const ucp_params_t& ctx_params,
87                                         const std::string& name,
88                                         const std::string& test_case_name,
89                                         const std::string& tls)
90 {
91     std::vector<ucp_test_param> result;
92 
93     ucp_params_t params = ucp_test::get_ctx_params();
94 
95     params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
96 
97     params.features = UCP_FEATURE_TAG;
98     generate_test_params_variant(params, name, test_case_name + "/tag", tls,
99                                  TEST_TAG, result);
100     generate_test_params_variant(params, name, test_case_name + "/tag_fail_imm",
101                                  tls, TEST_TAG | FAIL_IMM, result);
102 
103     params.features = UCP_FEATURE_RMA;
104     generate_test_params_variant(params, name, test_case_name + "/rma", tls,
105                                  TEST_RMA, result);
106     generate_test_params_variant(params, name, test_case_name + "/rma_fail_imm",
107                                  tls, TEST_RMA | FAIL_IMM, result);
108 
109     return result;
110 }
111 
get_ep_params()112 ucp_ep_params_t test_ucp_peer_failure::get_ep_params() {
113     ucp_ep_params_t params;
114     memset(&params, 0, sizeof(params));
115     params.field_mask      = UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
116                              UCP_EP_PARAM_FIELD_ERR_HANDLER;
117     params.err_mode        = UCP_ERR_HANDLING_MODE_PEER;
118     params.err_handler.cb  = err_cb;
119     params.err_handler.arg = reinterpret_cast<void*>(this);
120     return params;
121 }
122 
set_timeouts()123 void test_ucp_peer_failure::set_timeouts() {
124     /* Set small TL timeouts to reduce testing time */
125     m_env.push_back(new ucs::scoped_setenv("UCX_RC_TIMEOUT",     "10ms"));
126     m_env.push_back(new ucs::scoped_setenv("UCX_RC_RNR_TIMEOUT", "10ms"));
127     m_env.push_back(new ucs::scoped_setenv("UCX_RC_RETRY_COUNT", "2"));
128 }
129 
err_cb(void * arg,ucp_ep_h ep,ucs_status_t status)130 void test_ucp_peer_failure::err_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
131     test_ucp_peer_failure *self = reinterpret_cast<test_ucp_peer_failure*>(arg);
132     EXPECT_EQ(UCS_ERR_ENDPOINT_TIMEOUT, status);
133     self->m_err_status = status;
134     ++self->m_err_count;
135 }
136 
stable_sender()137 ucp_ep_h test_ucp_peer_failure::stable_sender() {
138     return sender().ep(0, STABLE_EP_INDEX);
139 }
140 
failing_sender()141 ucp_ep_h test_ucp_peer_failure::failing_sender() {
142     return sender().ep(0, FAILING_EP_INDEX);
143 }
144 
stable_receiver()145 ucp_test::entity& test_ucp_peer_failure::stable_receiver() {
146     return m_entities.at(m_entities.size() - 2);
147 }
148 
failing_receiver()149 ucp_test::entity& test_ucp_peer_failure::failing_receiver() {
150     return m_entities.at(m_entities.size() - 1);
151 }
152 
send_nb(ucp_ep_h ep,ucp_rkey_h rkey)153 void *test_ucp_peer_failure::send_nb(ucp_ep_h ep, ucp_rkey_h rkey) {
154     if (GetParam().variant & TEST_TAG) {
155         return ucp_tag_send_nb(ep, &m_sbuf[0], m_sbuf.size(), DATATYPE, 0,
156                                send_cb);
157     } else if (GetParam().variant & TEST_RMA) {
158         return ucp_put_nb(ep, &m_sbuf[0], m_sbuf.size(), (uintptr_t)&m_rbuf[0],
159                           rkey, send_cb);
160     } else {
161         ucs_fatal("invalid test case");
162     }
163 }
164 
recv_nb(entity & e)165 void *test_ucp_peer_failure::recv_nb(entity& e) {
166     ucs_assert(m_rbuf.size() >= m_sbuf.size());
167     if (GetParam().variant & TEST_TAG) {
168         return ucp_tag_recv_nb(e.worker(), &m_rbuf[0], m_rbuf.size(), DATATYPE, 0,
169                                0, recv_cb);
170     } else if (GetParam().variant & TEST_RMA) {
171         return NULL;
172     } else {
173         ucs_fatal("invalid test case");
174     }
175 }
176 
177 ucs_log_func_rc_t
warn_unreleased_rdesc_handler(const char * file,unsigned line,const char * function,ucs_log_level_t level,const ucs_log_component_config_t * comp_conf,const char * message,va_list ap)178 test_ucp_peer_failure::warn_unreleased_rdesc_handler(const char *file, unsigned line,
179                                                      const char *function,
180                                                      ucs_log_level_t level,
181                                                      const ucs_log_component_config_t *comp_conf,
182                                                      const char *message, va_list ap)
183 {
184     if (level == UCS_LOG_LEVEL_WARN) {
185         std::string err_str = format_message(message, ap);
186 
187         if (err_str.find("unexpected tag-receive descriptor") != std::string::npos) {
188             return UCS_LOG_FUNC_RC_STOP;
189         }
190     }
191 
192     return UCS_LOG_FUNC_RC_CONTINUE;
193 }
194 
fail_receiver()195 void test_ucp_peer_failure::fail_receiver() {
196     /* TODO: need to handle non-empty TX window in UD EP destructor",
197      *       see debug message (ud_ep.c:220)
198      *       ucs_debug("ep=%p id=%d conn_id=%d has %d unacked packets",
199      *                 self, self->ep_id, self->conn_id,
200      *                 (int)ucs_queue_length(&self->tx.window));
201      */
202     // TODO use force-close to close connections
203     flush_worker(failing_receiver());
204     m_failing_memh.reset();
205     {
206         /* transform warning messages about unreleased TM rdescs to test
207          * message that are expected here, since we closed receiver w/o
208          * reading the messages that were potentially received */
209         scoped_log_handler slh(warn_unreleased_rdesc_handler);
210         failing_receiver().cleanup();
211     }
212 }
213 
smoke_test(bool stable_pair)214 void test_ucp_peer_failure::smoke_test(bool stable_pair) {
215     void *rreq = recv_nb(stable_pair ? stable_receiver() : failing_receiver());
216     void *sreq = send_nb(stable_pair ? stable_sender()   : failing_sender(),
217                          stable_pair ? m_stable_rkey     : m_failing_rkey);
218     wait(sreq);
219     wait(rreq);
220     EXPECT_EQ(m_sbuf, m_rbuf);
221 }
222 
unmap_memh(ucp_mem_h memh,ucp_context_h context)223 void test_ucp_peer_failure::unmap_memh(ucp_mem_h memh, ucp_context_h context)
224 {
225     ucs_status_t status = ucp_mem_unmap(context, memh);
226     if (status != UCS_OK) {
227         ucs_warn("failed to unmap memory: %s", ucs_status_string(status));
228     }
229 }
230 
get_rkey(ucp_ep_h ep,entity & dst,mem_handle_t & memh,ucs::handle<ucp_rkey_h> & rkey)231 void test_ucp_peer_failure::get_rkey(ucp_ep_h ep, entity& dst, mem_handle_t& memh,
232                                      ucs::handle<ucp_rkey_h>& rkey) {
233     ucp_mem_map_params_t params;
234 
235     memset(&params, 0, sizeof(params));
236     params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
237                         UCP_MEM_MAP_PARAM_FIELD_LENGTH;
238     params.address    = &m_rbuf[0];
239     params.length     = m_rbuf.size();
240 
241     ucp_mem_h ucp_memh;
242     ucs_status_t status = ucp_mem_map(dst.ucph(), &params, &ucp_memh);
243     ASSERT_UCS_OK(status);
244     memh.reset(ucp_memh, unmap_memh, dst.ucph());
245 
246     void *rkey_buffer;
247     size_t rkey_buffer_size;
248     status = ucp_rkey_pack(dst.ucph(), memh, &rkey_buffer, &rkey_buffer_size);
249     ASSERT_UCS_OK(status);
250 
251     ucp_rkey_h ucp_rkey;
252     status = ucp_ep_rkey_unpack(ep, rkey_buffer, &ucp_rkey);
253     ASSERT_UCS_OK(status);
254     rkey.reset(ucp_rkey, ucp_rkey_destroy);
255 
256     ucp_rkey_buffer_release(rkey_buffer);
257 }
258 
set_rkeys()259 void test_ucp_peer_failure::set_rkeys() {
260 
261     if (GetParam().variant & TEST_RMA) {
262         get_rkey(failing_sender(), failing_receiver(), m_failing_memh,
263                  m_failing_rkey);
264         get_rkey(stable_sender(), stable_receiver(), m_stable_memh,
265                  m_stable_rkey);
266     }
267 }
268 
send_cb(void * request,ucs_status_t status)269 void test_ucp_peer_failure::send_cb(void *request, ucs_status_t status)
270 {
271 }
272 
recv_cb(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)273 void test_ucp_peer_failure::recv_cb(void *request, ucs_status_t status,
274                                     ucp_tag_recv_info_t *info)
275 {
276 }
277 
cleanup()278 void test_ucp_peer_failure::cleanup() {
279     m_failing_rkey.reset();
280     m_stable_rkey.reset();
281     m_failing_memh.reset();
282     m_stable_memh.reset();
283     ucp_test::cleanup();
284 }
285 
do_test(size_t msg_size,int pre_msg_count,bool force_close,bool request_must_fail)286 void test_ucp_peer_failure::do_test(size_t msg_size, int pre_msg_count,
287                                     bool force_close, bool request_must_fail)
288 {
289     skip_loopback();
290 
291     m_sbuf.resize(msg_size);
292     m_rbuf.resize(msg_size);
293 
294     /* connect 2 ep's from sender() to 2 receiver entities */
295     create_entity();
296     sender().connect(&stable_receiver(),  get_ep_params(), STABLE_EP_INDEX);
297     sender().connect(&failing_receiver(), get_ep_params(), FAILING_EP_INDEX);
298 
299     set_rkeys();
300 
301     /* Since we don't want to test peer failure on a stable pair
302      * and don't expect EP timeout error on those EPs,
303      * run traffic on a stable pair to connect it */
304     smoke_test(true);
305 
306     if (!(GetParam().variant & FAIL_IMM)) {
307         /* if not fail immediately, run traffic on failing pair to connect it */
308         smoke_test(false);
309     }
310 
311     /* put some sends on the failing pair */
312     std::vector<void*> sreqs_pre;
313     for (int i = 0; i < pre_msg_count; ++i) {
314         progress();
315         void *req = send_nb(failing_sender(), m_failing_rkey);
316         ASSERT_FALSE(UCS_PTR_IS_ERR(req));
317         if (UCS_PTR_IS_PTR(req)) {
318             sreqs_pre.push_back(req);
319         }
320     }
321 
322     EXPECT_EQ(UCS_OK, m_err_status);
323 
324     /* Since UCT/UD EP has a SW implementation of reliablity on which peer
325      * failure mechanism is based, we should set small UCT/UD EP timeout
326      * for UCT/UD EPs for sender's UCP EP to reduce testing time */
327     double prev_ib_ud_timeout = sender().set_ib_ud_timeout(3.);
328 
329     {
330         scoped_log_handler slh(wrap_errors_logger);
331 
332         fail_receiver();
333 
334         void *sreq = send_nb(failing_sender(), m_failing_rkey);
335 
336         while (!m_err_count) {
337             progress();
338         }
339         EXPECT_NE(UCS_OK, m_err_status);
340 
341         if (UCS_PTR_IS_PTR(sreq)) {
342             /* The request may either succeed or fail, even though the data is
343              * not * delivered - depends on when the error is detected on sender
344              * side and if zcopy/bcopy protocol is used. In any case, the
345              * request must complete, and all resources have to be released.
346              */
347             ucs_status_t status = ucp_request_check_status(sreq);
348             EXPECT_NE(UCS_INPROGRESS, status);
349             if (request_must_fail) {
350                 EXPECT_EQ(m_err_status, status);
351             } else {
352                 EXPECT_TRUE((m_err_status == status) || (UCS_OK == status));
353             }
354             ucp_request_release(sreq);
355         }
356 
357         /* Additional sends must fail */
358         void *sreq2 = send_nb(failing_sender(), m_failing_rkey);
359         EXPECT_FALSE(UCS_PTR_IS_PTR(sreq2));
360         EXPECT_EQ(m_err_status, UCS_PTR_STATUS(sreq2));
361 
362         if (force_close) {
363             unsigned allocd_eps_before =
364                     ucs_strided_alloc_inuse_count(&sender().worker()->ep_alloc);
365 
366             ucp_ep_h ep = sender().revoke_ep(0, FAILING_EP_INDEX);
367 
368             m_failing_rkey.reset();
369 
370             void *creq = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FORCE);
371             wait(creq);
372 
373             unsigned allocd_eps_after =
374                     ucs_strided_alloc_inuse_count(&sender().worker()->ep_alloc);
375 
376             if (!(GetParam().variant & FAIL_IMM)) {
377                 EXPECT_LT(allocd_eps_after, allocd_eps_before);
378             }
379         }
380 
381         /* release requests */
382         while (!sreqs_pre.empty()) {
383             void *req = sreqs_pre.back();
384             sreqs_pre.pop_back();
385             EXPECT_NE(UCS_INPROGRESS, ucp_request_test(req, NULL));
386             ucp_request_release(req);
387         }
388     }
389 
390     /* Since we won't test peer failure anymore, reset UCT/UD EP timeout to the
391      * default value to avoid possible UD EP timeout errors under high load */
392     sender().set_ib_ud_timeout(prev_ib_ud_timeout);
393 
394     /* Check workability of stable pair */
395     smoke_test(true);
396 
397     /* Check that TX polling is working well */
398     while (sender().progress());
399 
400     /* Destroy rkeys before destroying the worker (which also destroys the
401      * endpoints) */
402     m_failing_rkey.reset();
403     m_stable_rkey.reset();
404 
405     /* When all requests on sender are done we need to prevent LOCAL_FLUSH
406      * in test teardown. Receiver is killed and doesn't respond on FC requests
407      */
408     sender().destroy_worker();
409 }
410 
UCS_TEST_P(test_ucp_peer_failure,basic)411 UCS_TEST_P(test_ucp_peer_failure, basic) {
412     do_test(UCS_KBYTE, /* msg_size */
413             0, /* pre_msg_cnt */
414             false, /* force_close */
415             false /* must_fail */);
416 }
417 
UCS_TEST_P(test_ucp_peer_failure,rndv_disable)418 UCS_TEST_P(test_ucp_peer_failure, rndv_disable) {
419     const size_t size_max = std::numeric_limits<size_t>::max();
420 
421     sender().connect(&receiver(), get_ep_params(), STABLE_EP_INDEX);
422     EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv.am_thresh);
423     EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv.rma_thresh);
424     EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
425     EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
426 }
427 
428 UCS_TEST_P(test_ucp_peer_failure, zcopy, "ZCOPY_THRESH=1023") {
429     do_test(UCS_KBYTE, /* msg_size */
430             0, /* pre_msg_cnt */
431             false, /* force_close */
432             true /* must_fail */);
433 }
434 
435 UCS_TEST_P(test_ucp_peer_failure, bcopy_multi, "SEG_SIZE?=512", "RC_TM_ENABLE?=n") {
436     do_test(UCS_KBYTE, /* msg_size */
437             0, /* pre_msg_cnt */
438             false, /* force_close */
439             false /* must_fail */);
440 }
441 
442 UCS_TEST_P(test_ucp_peer_failure, force_close, "RC_FC_ENABLE?=n") {
443     do_test(16000, /* msg_size */
444             1000, /* pre_msg_cnt */
445             true, /* force_close */
446             false /* must_fail */);
447 }
448 
449 UCS_TEST_SKIP_COND_P(test_ucp_peer_failure, disable_sync_send,
450                      !(GetParam().variant & TEST_TAG)) {
451     const size_t        max_size = UCS_MBYTE;
452     std::vector<char>   buf(max_size, 0);
453     void                *req;
454 
455     sender().connect(&receiver(), get_ep_params());
456 
457     /* Make sure API is disabled for any size and data type */
458     for (size_t size = 1; size <= max_size; size *= 2) {
459         req = ucp_tag_send_sync_nb(sender().ep(), buf.data(), size, DATATYPE,
460                                    0x111337, NULL);
461         EXPECT_FALSE(UCS_PTR_IS_PTR(req));
462         EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
463 
464         ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), size);
465         req = ucp_tag_send_sync_nb(sender().ep(), dt_desc.buf(), dt_desc.count(),
466                                    dt_desc.dt(), 0x111337, NULL);
467         EXPECT_FALSE(UCS_PTR_IS_PTR(req));
468         EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
469     }
470 }
471