1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "uct_p2p_test.h"
8 extern "C" {
9 #include <ucs/arch/atomic.h>
10 }
11
12 class uct_flush_test : public uct_test {
13 public:
14 static const uint64_t SEED1 = 0x1111111111111111lu;
15 static const uint64_t SEED2 = 0x2222222222222222lu;
16 static const uint64_t SEED3 = 0x3333333333333333lu;
17 static const int AM_ID = 1;
18 static const int AM_ID_CANCEL = 2;
19
20 typedef void (uct_flush_test::* flush_func_t)();
21
22 struct test_req_t {
23 uct_pending_req_t uct;
24 uct_completion_t comp;
25 mapped_buffer *sendbuf;
26 uct_flush_test *test;
27 };
28
init()29 void init() {
30 uct_test::init();
31
32 entity *m_sender = uct_test::create_entity(0);
33 m_entities.push_back(m_sender);
34
35 check_skip_test();
36
37 if (UCT_DEVICE_TYPE_SELF == GetParam()->dev_type) {
38 m_sender->connect(0, *m_sender, 0);
39 } else {
40 entity *m_receiver = uct_test::create_entity(0);
41 m_entities.push_back(m_receiver);
42
43 m_sender->connect(0, *m_receiver, 0);
44 }
45 am_rx_count = 0;
46 m_flush_flags = 0;
47 }
48
pack_cb(void * dest,void * arg)49 static size_t pack_cb(void *dest, void *arg)
50 {
51 const mapped_buffer *sendbuf = (const mapped_buffer *)arg;
52 memcpy(dest, sendbuf->ptr(), sendbuf->length());
53 return sendbuf->length();
54 }
55
blocking_put_bcopy(const mapped_buffer & sendbuf,const mapped_buffer & recvbuf)56 void blocking_put_bcopy(const mapped_buffer &sendbuf,
57 const mapped_buffer &recvbuf)
58 {
59 ssize_t status;
60 for (;;) {
61 status = uct_ep_put_bcopy(sender().ep(0), pack_cb, (void*)&sendbuf,
62 recvbuf.addr(), recvbuf.rkey());
63 if (status >= 0) {
64 return;
65 } else if (status == UCS_ERR_NO_RESOURCE) {
66 progress();
67 continue;
68 } else {
69 ASSERT_UCS_OK((ucs_status_t)status);
70 }
71 }
72 }
73
blocking_am_bcopy(const mapped_buffer & sendbuf)74 void blocking_am_bcopy(const mapped_buffer &sendbuf)
75 {
76 ssize_t status;
77 for (;;) {
78 status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
79 (void*)&sendbuf, 0);
80 if (status >= 0) {
81 return;
82 } else if (status == UCS_ERR_NO_RESOURCE) {
83 progress();
84 continue;
85 } else {
86 ASSERT_UCS_OK((ucs_status_t)status);
87 }
88 }
89 }
90
am_handler(void * arg,void * data,size_t length,unsigned flags)91 static ucs_status_t am_handler(void *arg, void *data, size_t length,
92 unsigned flags)
93 {
94 if (arg == NULL) {
95 /* This is not completely canceled message, drop it */
96 return UCS_OK;
97 }
98 const mapped_buffer *recvbuf = (const mapped_buffer *)arg;
99 memcpy(recvbuf->ptr(), data, ucs_min(length, recvbuf->length()));
100 ucs_atomic_add32(&am_rx_count, 1);
101 return UCS_OK;
102 }
103
am_send_pending(test_req_t * am_req)104 ucs_status_t am_send_pending(test_req_t *am_req)
105 {
106 ssize_t status;
107
108 status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
109 (void*)am_req->sendbuf, 0);
110 if (status >= 0) {
111 --am_req->comp.count;
112 return UCS_OK;
113 } else {
114 return (ucs_status_t)status;
115 }
116 }
117
am_progress(uct_pending_req_t * req)118 static ucs_status_t am_progress(uct_pending_req_t *req)
119 {
120 test_req_t *am_req = ucs_container_of(req, test_req_t, uct);
121 return am_req->test->am_send_pending(am_req);
122 }
123
flush_progress(uct_pending_req_t * req)124 static ucs_status_t flush_progress(uct_pending_req_t *req)
125 {
126 test_req_t *flush_req = ucs_container_of(req, test_req_t, uct);
127 ucs_status_t status;
128
129 status = uct_ep_flush(flush_req->test->sender().ep(0), 0,
130 &flush_req->comp);
131 if (status == UCS_OK) {
132 --flush_req->comp.count;
133 return UCS_OK;
134 } else if (status == UCS_INPROGRESS) {
135 return UCS_OK;
136 } else if (status == UCS_ERR_NO_RESOURCE) {
137 return UCS_ERR_NO_RESOURCE;
138 } else {
139 UCS_TEST_ABORT("Error: " << ucs_status_string(status));
140 }
141 }
142
test_flush_put_bcopy(flush_func_t flush)143 void test_flush_put_bcopy(flush_func_t flush) {
144 const size_t length = 8;
145 mapped_buffer sendbuf(length, SEED1, sender());
146 mapped_buffer recvbuf(length, SEED2, receiver());
147 sendbuf.pattern_fill(SEED3);
148 blocking_put_bcopy(sendbuf, recvbuf);
149 (this->*flush)();
150
151 if (is_flush_cancel()) {
152 return;
153 }
154
155 recvbuf.pattern_check(SEED3);
156 }
157
wait_am(unsigned count)158 void wait_am(unsigned count) {
159 while (am_rx_count < count) {
160 progress();
161 sched_yield();
162 }
163 }
164
test_flush_am_zcopy(flush_func_t flush,bool destroy_ep)165 void test_flush_am_zcopy(flush_func_t flush, bool destroy_ep) {
166 const size_t length = 8;
167 if (is_flush_cancel()) {
168 ASSERT_TRUE(destroy_ep);
169 }
170 mapped_buffer sendbuf(length, SEED1, sender());
171 mapped_buffer recvbuf(length, SEED2, receiver());
172 sendbuf.pattern_fill(SEED3);
173
174 uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
175 is_flush_cancel() ? NULL : &recvbuf,
176 UCT_CB_FLAG_ASYNC);
177
178 uct_completion_t zcomp;
179 zcomp.count = 2;
180 zcomp.func = NULL;
181
182 ucs_status_t status;
183 UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(),
184 sendbuf.memh(),
185 sender().iface_attr().cap.am.max_iov);
186 do {
187 status = uct_ep_am_zcopy(sender().ep(0), get_am_id(), NULL, 0, iov,
188 iovcnt, 0, &zcomp);
189 progress();
190 } while (status == UCS_ERR_NO_RESOURCE);
191 ASSERT_UCS_OK_OR_INPROGRESS(status);
192 if (status == UCS_OK) {
193 --zcomp.count;
194 }
195
196 (this->*flush)();
197
198 EXPECT_EQ(1, zcomp.count); /* Zero copy op should be already completed
199 since flush returned */
200
201 if (destroy_ep) {
202 sender().destroy_ep(0);
203 }
204
205 if (is_flush_cancel()) {
206 return;
207 }
208
209 wait_am(1);
210
211 uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);
212
213 recvbuf.pattern_check(SEED3);
214 }
215
test_flush_am_disconnect(flush_func_t flush,bool destroy_ep)216 void test_flush_am_disconnect(flush_func_t flush, bool destroy_ep) {
217 const size_t length = 8;
218 if (is_flush_cancel()) {
219 ASSERT_TRUE(destroy_ep);
220 }
221 mapped_buffer sendbuf(length, SEED1, sender());
222 mapped_buffer recvbuf(length, SEED2, receiver());
223 sendbuf.pattern_fill(SEED3);
224
225 uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
226 is_flush_cancel() ? NULL : &recvbuf,
227 UCT_CB_FLAG_ASYNC);
228 blocking_am_bcopy(sendbuf);
229 (this->*flush)();
230
231 if (destroy_ep) {
232 sender().destroy_ep(0);
233 }
234
235 if (is_flush_cancel()) {
236 return;
237 }
238
239 wait_am(1);
240 uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);
241
242 recvbuf.pattern_check(SEED3);
243 }
244
flush_ep_no_comp()245 void flush_ep_no_comp() {
246 ucs_status_t status;
247 do {
248 progress();
249 status = uct_ep_flush(sender().ep(0), m_flush_flags, NULL);
250 } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS));
251 ASSERT_UCS_OK(status);
252 }
253
flush_iface_no_comp()254 void flush_iface_no_comp() {
255 ucs_status_t status;
256 do {
257 progress();
258 status = uct_iface_flush(sender().iface(), m_flush_flags, NULL);
259 } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS));
260 ASSERT_UCS_OK(status);
261 }
262
flush_ep_nb()263 void flush_ep_nb() {
264 uct_completion_t comp;
265 ucs_status_t status;
266 comp.count = 2;
267 comp.func = NULL;
268 do {
269 progress();
270 status = uct_ep_flush(sender().ep(0), m_flush_flags, &comp);
271 } while (status == UCS_ERR_NO_RESOURCE);
272 ASSERT_UCS_OK_OR_INPROGRESS(status);
273 if (status == UCS_OK) {
274 return;
275 }
276 /* coverity[loop_condition] */
277 while (comp.count != 1) {
278 progress();
279 }
280 }
281
282 void test_flush_am_pending(flush_func_t flush, bool destroy_ep);
283
284 protected:
sender()285 uct_test::entity& sender() {
286 return **m_entities.begin();
287 }
288
receiver()289 uct_test::entity& receiver() {
290 return **(m_entities.end() - 1);
291 }
292
is_flush_cancel() const293 bool is_flush_cancel() const {
294 return (m_flush_flags & UCT_FLUSH_FLAG_CANCEL);
295 }
296
get_am_id() const297 uint8_t get_am_id() const {
298 return is_flush_cancel() ? AM_ID_CANCEL : AM_ID;
299 }
300
301 static uint32_t am_rx_count;
302 unsigned m_flush_flags;
303 };
304
305 uint32_t uct_flush_test::am_rx_count = 0;
306
test_flush_am_pending(flush_func_t flush,bool destroy_ep)307 void uct_flush_test::test_flush_am_pending(flush_func_t flush, bool destroy_ep)
308 {
309 if (is_flush_cancel()) {
310 ASSERT_TRUE(destroy_ep);
311 }
312 const size_t length = 8;
313 mapped_buffer sendbuf(length, SEED1, sender());
314 mapped_buffer recvbuf(length, SEED2, receiver());
315 sendbuf.pattern_fill(SEED3);
316
317 uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
318 is_flush_cancel() ? NULL : &recvbuf,
319 UCT_CB_FLAG_ASYNC);
320
321 /* Send until resources are exhausted or timeout in 1sec*/
322 unsigned count = 0;
323 ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(1.0);
324 ssize_t packed_len;
325 for (;;) {
326 packed_len = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
327 (void*)&sendbuf, 0);
328 if (packed_len == UCS_ERR_NO_RESOURCE) {
329 break;
330 }
331 if (ucs_get_time() > loop_end_limit) {
332 ++count;
333 break;
334 }
335
336 if (packed_len >= 0) {
337 ++count;
338 } else {
339 ASSERT_UCS_OK((ucs_status_t)packed_len);
340 }
341 }
342
343 /* Queue some pending AMs */
344 ucs_status_t status;
345 std::vector<test_req_t> reqs;
346 reqs.resize(10);
347 for (std::vector<test_req_t>::iterator it = reqs.begin(); it != reqs.end();) {
348 it->sendbuf = &sendbuf;
349 it->test = this;
350 it->uct.func = am_progress;
351 it->comp.count = 2;
352 it->comp.func = NULL;
353 status = uct_ep_pending_add(sender().ep(0), &it->uct, 0);
354 if (UCS_ERR_BUSY == status) {
355 /* User advised to retry the send. It means no requests added
356 * to the queue
357 */
358 it = reqs.erase(it);
359 status = UCS_OK;
360 } else {
361 ++count;
362 ++it;
363 }
364 ASSERT_UCS_OK(status);
365 }
366
367 /* Try to start a flush */
368 test_req_t flush_req;
369 flush_req.comp.count = 2;
370 flush_req.comp.func = NULL;
371
372 for (;;) {
373 status = uct_ep_flush(sender().ep(0), m_flush_flags, &flush_req.comp);
374 if (status == UCS_OK) {
375 --flush_req.comp.count;
376 } else if (status == UCS_ERR_NO_RESOURCE) {
377 /* If flush returned NO_RESOURCE, add to pending must succeed */
378 flush_req.test = this;
379 flush_req.uct.func = flush_progress;
380 status = uct_ep_pending_add(sender().ep(0), &flush_req.uct, 0);
381 if (status == UCS_ERR_BUSY) {
382 continue;
383 }
384 EXPECT_EQ(UCS_OK, status);
385 } else if (status == UCS_INPROGRESS) {
386 } else {
387 UCS_TEST_ABORT("failed to flush ep: " << ucs_status_string(status));
388 }
389 break;
390 }
391
392 /* timeout used to prevent test hung */
393 wait_for_value(&flush_req.comp.count, 1, true, 60.0);
394 EXPECT_EQ(1, flush_req.comp.count);
395
396 while (!reqs.empty()) {
397 if (is_flush_cancel()) {
398 EXPECT_EQ(2, reqs.back().comp.count);
399 } else {
400 EXPECT_EQ(1, reqs.back().comp.count);
401 }
402 reqs.pop_back();
403 }
404
405 if (!is_flush_cancel()) {
406 wait_am(count);
407 }
408
409 if (destroy_ep) {
410 sender().destroy_ep(0);
411 }
412
413 if (is_flush_cancel()) {
414 return;
415 }
416
417 uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);
418
419 recvbuf.pattern_check(SEED3);
420 }
421
422 UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_no_comp,
423 !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
424 am_rx_count = 0;
425 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
426
427 test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp);
428
429 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
430 am_rx_count = 0;
431 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
432 test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp);
433 }
434 }
435
436 UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_iface_no_comp,
437 !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
438 test_flush_put_bcopy(&uct_flush_test::flush_iface_no_comp);
439 }
440
441 UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_nb,
442 !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
443 am_rx_count = 0;
444 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
445
446 test_flush_put_bcopy(&uct_flush_test::flush_ep_nb);
447
448 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
449 am_rx_count = 0;
450 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
451 test_flush_put_bcopy(&uct_flush_test::flush_ep_nb);
452 }
453 }
454
455 UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_no_comp,
456 !check_caps(UCT_IFACE_FLAG_AM_ZCOPY),
457 "UD_TIMER_TICK?=100ms") {
458 am_rx_count = 0;
459 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
460
461 test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, false);
462
463 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
464 am_rx_count = 0;
465 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
466 test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, true);
467 }
468 }
469
470 UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_iface_no_comp,
471 !check_caps(UCT_IFACE_FLAG_AM_ZCOPY),
472 "UD_TIMER_TICK?=100ms") {
473 test_flush_am_zcopy(&uct_flush_test::flush_iface_no_comp, true);
474 }
475
476 UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_nb,
477 !check_caps(UCT_IFACE_FLAG_AM_ZCOPY),
478 "UD_TIMER_TICK?=100ms") {
479 am_rx_count = 0;
480 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
481
482 test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, false);
483
484 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
485 am_rx_count = 0;
486 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
487 test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, true);
488 }
489 }
490
491 UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_no_comp,
492 !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
493 am_rx_count = 0;
494 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
495
496 test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, false);
497
498 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
499 am_rx_count = 0;
500 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
501 test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, true);
502 }
503 }
504
505 UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_iface_no_comp,
506 !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
507 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
508 test_flush_am_disconnect(&uct_flush_test::flush_iface_no_comp, true);
509 }
510
511 UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_nb,
512 !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
513 am_rx_count = 0;
514 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
515
516 test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, false);
517
518 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
519 am_rx_count = 0;
520 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
521 test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, true);
522 }
523 }
524
525 UCS_TEST_SKIP_COND_P(uct_flush_test, am_pending_flush_nb,
526 !check_caps(UCT_IFACE_FLAG_AM_BCOPY |
527 UCT_IFACE_FLAG_PENDING)) {
528 am_rx_count = 0;
529 m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
530
531 test_flush_am_pending(&uct_flush_test::flush_ep_nb, false);
532
533 if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
534 am_rx_count = 0;
535 m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
536 test_flush_am_pending(&uct_flush_test::flush_ep_nb, true);
537 }
538 }
539
540 UCT_INSTANTIATE_TEST_CASE(uct_flush_test)
541