1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #include "uct_p2p_test.h"
8 extern "C" {
9 #include <ucs/arch/atomic.h>
10 }
11 
12 class uct_flush_test : public uct_test {
13 public:
14     static const uint64_t SEED1 = 0x1111111111111111lu;
15     static const uint64_t SEED2 = 0x2222222222222222lu;
16     static const uint64_t SEED3 = 0x3333333333333333lu;
17     static const int      AM_ID         = 1;
18     static const int      AM_ID_CANCEL  = 2;
19 
20     typedef void (uct_flush_test::* flush_func_t)();
21 
22     struct test_req_t {
23         uct_pending_req_t  uct;
24         uct_completion_t   comp;
25         mapped_buffer      *sendbuf;
26         uct_flush_test     *test;
27     };
28 
init()29     void init() {
30         uct_test::init();
31 
32         entity *m_sender = uct_test::create_entity(0);
33         m_entities.push_back(m_sender);
34 
35         check_skip_test();
36 
37         if (UCT_DEVICE_TYPE_SELF == GetParam()->dev_type) {
38             m_sender->connect(0, *m_sender, 0);
39         } else {
40             entity *m_receiver = uct_test::create_entity(0);
41             m_entities.push_back(m_receiver);
42 
43             m_sender->connect(0, *m_receiver, 0);
44         }
45         am_rx_count   = 0;
46         m_flush_flags = 0;
47     }
48 
pack_cb(void * dest,void * arg)49     static size_t pack_cb(void *dest, void *arg)
50     {
51         const mapped_buffer *sendbuf = (const mapped_buffer *)arg;
52         memcpy(dest, sendbuf->ptr(), sendbuf->length());
53         return sendbuf->length();
54     }
55 
blocking_put_bcopy(const mapped_buffer & sendbuf,const mapped_buffer & recvbuf)56     void blocking_put_bcopy(const mapped_buffer &sendbuf,
57                             const mapped_buffer &recvbuf)
58     {
59         ssize_t status;
60          for (;;) {
61              status = uct_ep_put_bcopy(sender().ep(0), pack_cb, (void*)&sendbuf,
62                                        recvbuf.addr(), recvbuf.rkey());
63              if (status >= 0) {
64                  return;
65              } else if (status == UCS_ERR_NO_RESOURCE) {
66                  progress();
67                  continue;
68              } else {
69                  ASSERT_UCS_OK((ucs_status_t)status);
70              }
71          }
72     }
73 
blocking_am_bcopy(const mapped_buffer & sendbuf)74     void blocking_am_bcopy(const mapped_buffer &sendbuf)
75     {
76          ssize_t status;
77          for (;;) {
78              status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
79                                       (void*)&sendbuf, 0);
80              if (status >= 0) {
81                  return;
82              } else if (status == UCS_ERR_NO_RESOURCE) {
83                  progress();
84                  continue;
85              } else {
86                  ASSERT_UCS_OK((ucs_status_t)status);
87              }
88          }
89     }
90 
am_handler(void * arg,void * data,size_t length,unsigned flags)91     static ucs_status_t am_handler(void *arg, void *data, size_t length,
92                                    unsigned flags)
93     {
94         if (arg == NULL) {
95             /* This is not completely canceled message, drop it */
96             return UCS_OK;
97         }
98         const mapped_buffer *recvbuf = (const mapped_buffer *)arg;
99         memcpy(recvbuf->ptr(), data, ucs_min(length, recvbuf->length()));
100         ucs_atomic_add32(&am_rx_count, 1);
101         return UCS_OK;
102     }
103 
am_send_pending(test_req_t * am_req)104     ucs_status_t am_send_pending(test_req_t *am_req)
105     {
106         ssize_t status;
107 
108         status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
109                                  (void*)am_req->sendbuf, 0);
110         if (status >= 0) {
111             --am_req->comp.count;
112             return UCS_OK;
113         } else {
114             return (ucs_status_t)status;
115         }
116     }
117 
am_progress(uct_pending_req_t * req)118     static ucs_status_t am_progress(uct_pending_req_t *req)
119     {
120         test_req_t *am_req = ucs_container_of(req, test_req_t, uct);
121         return am_req->test->am_send_pending(am_req);
122     }
123 
flush_progress(uct_pending_req_t * req)124     static ucs_status_t flush_progress(uct_pending_req_t *req)
125     {
126         test_req_t *flush_req = ucs_container_of(req, test_req_t, uct);
127         ucs_status_t status;
128 
129         status = uct_ep_flush(flush_req->test->sender().ep(0), 0,
130                               &flush_req->comp);
131         if (status == UCS_OK) {
132             --flush_req->comp.count;
133             return UCS_OK;
134         } else if (status == UCS_INPROGRESS) {
135             return UCS_OK;
136         } else if (status == UCS_ERR_NO_RESOURCE) {
137             return UCS_ERR_NO_RESOURCE;
138         } else {
139             UCS_TEST_ABORT("Error: " << ucs_status_string(status));
140         }
141     }
142 
test_flush_put_bcopy(flush_func_t flush)143     void test_flush_put_bcopy(flush_func_t flush) {
144         const size_t length = 8;
145         mapped_buffer sendbuf(length, SEED1, sender());
146         mapped_buffer recvbuf(length, SEED2, receiver());
147         sendbuf.pattern_fill(SEED3);
148         blocking_put_bcopy(sendbuf, recvbuf);
149         (this->*flush)();
150 
151         if (is_flush_cancel()) {
152             return;
153         }
154 
155         recvbuf.pattern_check(SEED3);
156     }
157 
wait_am(unsigned count)158     void wait_am(unsigned count) {
159         while (am_rx_count < count) {
160             progress();
161             sched_yield();
162         }
163     }
164 
test_flush_am_zcopy(flush_func_t flush,bool destroy_ep)165     void test_flush_am_zcopy(flush_func_t flush, bool destroy_ep) {
166         const size_t length = 8;
167         if (is_flush_cancel()) {
168             ASSERT_TRUE(destroy_ep);
169         }
170         mapped_buffer sendbuf(length, SEED1, sender());
171         mapped_buffer recvbuf(length, SEED2, receiver());
172         sendbuf.pattern_fill(SEED3);
173 
174         uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
175                                  is_flush_cancel() ? NULL : &recvbuf,
176                                  UCT_CB_FLAG_ASYNC);
177 
178         uct_completion_t zcomp;
179         zcomp.count = 2;
180         zcomp.func  = NULL;
181 
182         ucs_status_t status;
183         UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(),
184                                 sendbuf.memh(),
185                                 sender().iface_attr().cap.am.max_iov);
186         do {
187             status = uct_ep_am_zcopy(sender().ep(0), get_am_id(), NULL, 0, iov,
188                                      iovcnt, 0, &zcomp);
189             progress();
190         } while (status == UCS_ERR_NO_RESOURCE);
191         ASSERT_UCS_OK_OR_INPROGRESS(status);
192         if (status == UCS_OK) {
193             --zcomp.count;
194         }
195 
196         (this->*flush)();
197 
198         EXPECT_EQ(1, zcomp.count); /* Zero copy op should be already completed
199                                       since flush returned */
200 
201         if (destroy_ep) {
202             sender().destroy_ep(0);
203         }
204 
205         if (is_flush_cancel()) {
206             return;
207         }
208 
209         wait_am(1);
210 
211         uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);
212 
213         recvbuf.pattern_check(SEED3);
214     }
215 
test_flush_am_disconnect(flush_func_t flush,bool destroy_ep)216     void test_flush_am_disconnect(flush_func_t flush, bool destroy_ep) {
217         const size_t length = 8;
218         if (is_flush_cancel()) {
219             ASSERT_TRUE(destroy_ep);
220         }
221         mapped_buffer sendbuf(length, SEED1, sender());
222         mapped_buffer recvbuf(length, SEED2, receiver());
223         sendbuf.pattern_fill(SEED3);
224 
225         uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
226                                  is_flush_cancel() ? NULL : &recvbuf,
227                                  UCT_CB_FLAG_ASYNC);
228         blocking_am_bcopy(sendbuf);
229         (this->*flush)();
230 
231         if (destroy_ep) {
232             sender().destroy_ep(0);
233         }
234 
235         if (is_flush_cancel()) {
236             return;
237         }
238 
239         wait_am(1);
240         uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);
241 
242         recvbuf.pattern_check(SEED3);
243     }
244 
flush_ep_no_comp()245     void flush_ep_no_comp() {
246         ucs_status_t status;
247         do {
248             progress();
249             status = uct_ep_flush(sender().ep(0), m_flush_flags, NULL);
250         } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS));
251         ASSERT_UCS_OK(status);
252     }
253 
flush_iface_no_comp()254     void flush_iface_no_comp() {
255         ucs_status_t status;
256         do {
257             progress();
258             status = uct_iface_flush(sender().iface(), m_flush_flags, NULL);
259         } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS));
260         ASSERT_UCS_OK(status);
261     }
262 
flush_ep_nb()263     void flush_ep_nb() {
264         uct_completion_t comp;
265         ucs_status_t status;
266         comp.count = 2;
267         comp.func  = NULL;
268         do {
269             progress();
270             status = uct_ep_flush(sender().ep(0), m_flush_flags, &comp);
271         } while (status == UCS_ERR_NO_RESOURCE);
272         ASSERT_UCS_OK_OR_INPROGRESS(status);
273         if (status == UCS_OK) {
274             return;
275         }
276         /* coverity[loop_condition] */
277         while (comp.count != 1) {
278             progress();
279         }
280     }
281 
282     void test_flush_am_pending(flush_func_t flush, bool destroy_ep);
283 
284 protected:
sender()285     uct_test::entity& sender() {
286         return **m_entities.begin();
287     }
288 
receiver()289     uct_test::entity& receiver() {
290         return **(m_entities.end() - 1);
291     }
292 
is_flush_cancel() const293     bool is_flush_cancel() const {
294         return (m_flush_flags & UCT_FLUSH_FLAG_CANCEL);
295     }
296 
get_am_id() const297     uint8_t get_am_id() const {
298         return is_flush_cancel() ? AM_ID_CANCEL : AM_ID;
299     }
300 
301     static uint32_t am_rx_count;
302     unsigned        m_flush_flags;
303 };
304 
305 uint32_t uct_flush_test::am_rx_count = 0;
306 
test_flush_am_pending(flush_func_t flush,bool destroy_ep)307 void uct_flush_test::test_flush_am_pending(flush_func_t flush, bool destroy_ep)
308 {
309      if (is_flush_cancel()) {
310          ASSERT_TRUE(destroy_ep);
311      }
312      const size_t length = 8;
313      mapped_buffer sendbuf(length, SEED1, sender());
314      mapped_buffer recvbuf(length, SEED2, receiver());
315      sendbuf.pattern_fill(SEED3);
316 
317      uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
318                               is_flush_cancel() ? NULL : &recvbuf,
319                               UCT_CB_FLAG_ASYNC);
320 
321      /* Send until resources are exhausted or timeout in 1sec*/
322      unsigned count = 0;
323      ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(1.0);
324      ssize_t packed_len;
325      for (;;) {
326          packed_len = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
327                                       (void*)&sendbuf, 0);
328          if (packed_len == UCS_ERR_NO_RESOURCE) {
329              break;
330          }
331          if (ucs_get_time() > loop_end_limit) {
332              ++count;
333              break;
334          }
335 
336          if (packed_len >= 0) {
337              ++count;
338          } else {
339              ASSERT_UCS_OK((ucs_status_t)packed_len);
340          }
341      }
342 
343      /* Queue some pending AMs */
344      ucs_status_t status;
345      std::vector<test_req_t> reqs;
346      reqs.resize(10);
347      for (std::vector<test_req_t>::iterator it = reqs.begin(); it != reqs.end();) {
348          it->sendbuf    = &sendbuf;
349          it->test       = this;
350          it->uct.func   = am_progress;
351          it->comp.count = 2;
352          it->comp.func  = NULL;
353          status = uct_ep_pending_add(sender().ep(0), &it->uct, 0);
354          if (UCS_ERR_BUSY == status) {
355              /* User advised to retry the send. It means no requests added
356               * to the queue
357               */
358              it = reqs.erase(it);
359              status = UCS_OK;
360          } else {
361              ++count;
362              ++it;
363          }
364          ASSERT_UCS_OK(status);
365      }
366 
367      /* Try to start a flush */
368      test_req_t flush_req;
369      flush_req.comp.count = 2;
370      flush_req.comp.func  = NULL;
371 
372      for (;;) {
373          status = uct_ep_flush(sender().ep(0), m_flush_flags, &flush_req.comp);
374          if (status == UCS_OK) {
375              --flush_req.comp.count;
376          } else if (status == UCS_ERR_NO_RESOURCE) {
377              /* If flush returned NO_RESOURCE, add to pending must succeed */
378              flush_req.test      = this;
379              flush_req.uct.func  = flush_progress;
380              status = uct_ep_pending_add(sender().ep(0), &flush_req.uct, 0);
381              if (status == UCS_ERR_BUSY) {
382                  continue;
383              }
384              EXPECT_EQ(UCS_OK, status);
385          } else if (status == UCS_INPROGRESS) {
386          } else {
387              UCS_TEST_ABORT("failed to flush ep: " << ucs_status_string(status));
388          }
389          break;
390      }
391 
392      /* timeout used to prevent test hung */
393      wait_for_value(&flush_req.comp.count, 1, true, 60.0);
394      EXPECT_EQ(1, flush_req.comp.count);
395 
396      while (!reqs.empty()) {
397          if (is_flush_cancel()) {
398             EXPECT_EQ(2, reqs.back().comp.count);
399          } else {
400             EXPECT_EQ(1, reqs.back().comp.count);
401          }
402          reqs.pop_back();
403      }
404 
405      if (!is_flush_cancel()) {
406         wait_am(count);
407      }
408 
409      if (destroy_ep) {
410         sender().destroy_ep(0);
411      }
412 
413      if (is_flush_cancel()) {
414          return;
415      }
416 
417      uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);
418 
419      recvbuf.pattern_check(SEED3);
420 }
421 
422 UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_no_comp,
423                      !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
424     am_rx_count   = 0;
425     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
426 
427     test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp);
428 
429     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
430         am_rx_count    = 0;
431         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
432         test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp);
433     }
434 }
435 
436 UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_iface_no_comp,
437                      !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
438     test_flush_put_bcopy(&uct_flush_test::flush_iface_no_comp);
439 }
440 
441 UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_nb,
442                      !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
443     am_rx_count   = 0;
444     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
445 
446     test_flush_put_bcopy(&uct_flush_test::flush_ep_nb);
447 
448     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
449         am_rx_count    = 0;
450         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
451         test_flush_put_bcopy(&uct_flush_test::flush_ep_nb);
452     }
453 }
454 
455 UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_no_comp,
456                      !check_caps(UCT_IFACE_FLAG_AM_ZCOPY),
457                      "UD_TIMER_TICK?=100ms") {
458     am_rx_count   = 0;
459     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
460 
461     test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, false);
462 
463     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
464         am_rx_count    = 0;
465         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
466         test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, true);
467     }
468 }
469 
470 UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_iface_no_comp,
471                      !check_caps(UCT_IFACE_FLAG_AM_ZCOPY),
472                      "UD_TIMER_TICK?=100ms") {
473     test_flush_am_zcopy(&uct_flush_test::flush_iface_no_comp, true);
474 }
475 
476 UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_nb,
477                      !check_caps(UCT_IFACE_FLAG_AM_ZCOPY),
478                      "UD_TIMER_TICK?=100ms") {
479     am_rx_count   = 0;
480     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
481 
482     test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, false);
483 
484     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
485         am_rx_count    = 0;
486         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
487         test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, true);
488     }
489 }
490 
491 UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_no_comp,
492                      !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
493     am_rx_count   = 0;
494     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
495 
496     test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, false);
497 
498     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
499         am_rx_count    = 0;
500         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
501         test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, true);
502     }
503 }
504 
505 UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_iface_no_comp,
506                      !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
507     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
508     test_flush_am_disconnect(&uct_flush_test::flush_iface_no_comp, true);
509 }
510 
511 UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_nb,
512                      !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
513     am_rx_count   = 0;
514     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
515 
516     test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, false);
517 
518     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
519         am_rx_count    = 0;
520         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
521         test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, true);
522     }
523 }
524 
525 UCS_TEST_SKIP_COND_P(uct_flush_test, am_pending_flush_nb,
526                      !check_caps(UCT_IFACE_FLAG_AM_BCOPY |
527                                  UCT_IFACE_FLAG_PENDING)) {
528     am_rx_count   = 0;
529     m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
530 
531     test_flush_am_pending(&uct_flush_test::flush_ep_nb, false);
532 
533     if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
534         am_rx_count    = 0;
535         m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
536         test_flush_am_pending(&uct_flush_test::flush_ep_nb, true);
537     }
538 }
539 
540 UCT_INSTANTIATE_TEST_CASE(uct_flush_test)
541