1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #include "ucp_test.h"
8 
9 #include <algorithm>
10 #include <sys/epoll.h>
11 #include <sys/poll.h>
12 
13 
14 class test_ucp_wakeup : public ucp_test {
15 public:
get_ctx_params()16     static ucp_params_t get_ctx_params() {
17         ucp_params_t params = ucp_test::get_ctx_params();
18         params.features |= UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP;
19         return params;
20     }
21 
22 protected:
send_completion(void * request,ucs_status_t status)23     static void send_completion(void *request, ucs_status_t status) {
24         ++comp_cntr;
25     }
26 
recv_completion(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)27     static void recv_completion(void *request, ucs_status_t status,
28                                 ucp_tag_recv_info_t *info) {
29         ++comp_cntr;
30     }
31 
wait(void * req)32     void wait(void *req) {
33         do {
34             progress();
35         } while (!ucp_request_is_completed(req));
36         ucp_request_release(req);
37     }
38 
arm(ucp_worker_h worker)39     void arm(ucp_worker_h worker) {
40         ucs_status_t status;
41         do {
42             status = ucp_worker_arm(worker);
43         } while (UCS_ERR_BUSY == status);
44         ASSERT_EQ(UCS_OK, status);
45     }
46 
47     static size_t comp_cntr;
48 };
49 
50 size_t test_ucp_wakeup::comp_cntr = 0;
51 
UCS_TEST_P(test_ucp_wakeup,efd)52 UCS_TEST_P(test_ucp_wakeup, efd)
53 {
54     const ucp_datatype_t DATATYPE = ucp_dt_make_contig(1);
55     const uint64_t TAG = 0xdeadbeef;
56     ucp_worker_h recv_worker;
57     int recv_efd;
58     void *req;
59 
60     sender().connect(&receiver(), get_ep_params());
61 
62     recv_worker = receiver().worker();
63     ASSERT_UCS_OK(ucp_worker_get_efd(recv_worker, &recv_efd));
64 
65     uint64_t send_data = 0x12121212;
66     req = ucp_tag_send_nb(sender().ep(), &send_data, sizeof(send_data), DATATYPE,
67                           TAG, send_completion);
68     if (UCS_PTR_IS_PTR(req)) {
69         wait(req);
70     } else {
71         ASSERT_UCS_OK(UCS_PTR_STATUS(req));
72     }
73 
74     uint64_t recv_data = 0;
75     req = ucp_tag_recv_nb(receiver().worker(), &recv_data, sizeof(recv_data),
76                           DATATYPE, TAG, (ucp_tag_t)-1, recv_completion);
77     while (!ucp_request_is_completed(req)) {
78 
79         if (ucp_worker_progress(recv_worker)) {
80             /* Got some receive events, check request */
81             continue;
82         }
83 
84         ucs_status_t status = ucp_worker_arm(recv_worker);
85         if (status == UCS_ERR_BUSY) {
86             /* Could not arm, poll again */
87             ucp_worker_progress(recv_worker);
88             continue;
89         }
90         ASSERT_UCS_OK(status);
91 
92         int ret;
93         do {
94             struct pollfd pollfd;
95             pollfd.events = POLLIN;
96             pollfd.fd     = recv_efd;
97             ret = poll(&pollfd, 1, -1);
98         } while ((ret < 0) && (errno == EINTR));
99         if (ret < 0) {
100             UCS_TEST_MESSAGE << "poll() failed: " << strerror(errno);
101         }
102         ASSERT_EQ(1, ret);
103         EXPECT_EQ(UCS_ERR_BUSY, ucp_worker_arm(recv_worker));
104     }
105 
106     ucp_request_release(req);
107 
108     flush_worker(sender());
109     EXPECT_EQ(send_data, recv_data);
110 }
111 
112 /* This test doesn't progress receiver's worker, while
113  * waiting for the events on a sender's worker fd. So,
114  * this causes the hang due to lack of the progress during
115  * TCP CM message exchange (TCP doesn't have an async progress
116  * for such events)
117  * TODO: add async progress for TCP connections */
118 UCS_TEST_SKIP_COND_P(test_ucp_wakeup, tx_wait,
119                      has_transport("tcp"), "ZCOPY_THRESH=10000")
120 {
121     const ucp_datatype_t DATATYPE = ucp_dt_make_contig(1);
122     const size_t COUNT            = 20000;
123     const uint64_t TAG            = 0xdeadbeef;
124     std::string send_data(COUNT, '2'), recv_data(COUNT, '1');
125     void *sreq, *rreq;
126 
127     sender().connect(&receiver(), get_ep_params());
128 
129     rreq = ucp_tag_recv_nb(receiver().worker(), &recv_data[0], COUNT, DATATYPE,
130                            TAG, (ucp_tag_t)-1, recv_completion);
131 
132     sreq = ucp_tag_send_nb(sender().ep(), &send_data[0], COUNT, DATATYPE, TAG,
133                            send_completion);
134 
135     if (UCS_PTR_IS_PTR(sreq)) {
136         /* wait for send completion */
137         do {
138             ucp_worker_wait(sender().worker());
139             while (progress());
140         } while (!ucp_request_is_completed(sreq));
141         ucp_request_release(sreq);
142     } else {
143         ASSERT_UCS_OK(UCS_PTR_STATUS(sreq));
144     }
145 
146     wait(rreq);
147 
148     EXPECT_EQ(send_data, recv_data);
149 }
150 
UCS_TEST_P(test_ucp_wakeup,signal)151 UCS_TEST_P(test_ucp_wakeup, signal)
152 {
153     int efd;
154     ucp_worker_h worker;
155     struct pollfd polled;
156 
157     polled.events = POLLIN;
158 
159     worker = sender().worker();
160     ASSERT_UCS_OK(ucp_worker_get_efd(worker, &efd));
161 
162     polled.fd = efd;
163     EXPECT_EQ(0, poll(&polled, 1, 0));
164     arm(worker);
165     ASSERT_UCS_OK(ucp_worker_signal(worker));
166     EXPECT_EQ(1, poll(&polled, 1, 0));
167     arm(worker);
168     EXPECT_EQ(0, poll(&polled, 1, 0));
169 
170     ASSERT_UCS_OK(ucp_worker_signal(worker));
171     ASSERT_UCS_OK(ucp_worker_signal(worker));
172     EXPECT_EQ(1, poll(&polled, 1, 0));
173     arm(worker);
174     EXPECT_EQ(0, poll(&polled, 1, 0));
175 
176     ASSERT_UCS_OK(ucp_worker_signal(worker));
177     EXPECT_EQ(UCS_ERR_BUSY, ucp_worker_arm(worker));
178     EXPECT_EQ(UCS_OK, ucp_worker_arm(worker));
179 }
180 
181 UCP_INSTANTIATE_TEST_CASE(test_ucp_wakeup)
182 
183 class test_ucp_wakeup_external_epollfd : public test_ucp_wakeup {
184 public:
get_worker_params()185     virtual ucp_worker_params_t get_worker_params() {
186         ucp_worker_params_t params = test_ucp_wakeup::get_worker_params();
187         params.field_mask |= UCP_WORKER_PARAM_FIELD_EVENT_FD |
188                              UCP_WORKER_PARAM_FIELD_USER_DATA;
189         params.event_fd  = m_epfd;
190         params.user_data = USER_DATA;
191         return params;
192     }
193 
194 protected:
195     static void* const USER_DATA;
196 
init()197     virtual void init() {
198         m_epfd = epoll_create(1);
199         ASSERT_GE(m_epfd, 0);
200         test_ucp_wakeup::init();
201     }
202 
cleanup()203     virtual void cleanup() {
204         test_ucp_wakeup::cleanup();
205         close(m_epfd);
206     }
207 
208     int m_epfd;
209 };
210 
211 void* const test_ucp_wakeup_external_epollfd::USER_DATA = (void*)0x1337abcdef;
212 
213 
UCS_TEST_P(test_ucp_wakeup_external_epollfd,epoll_wait)214 UCS_TEST_P(test_ucp_wakeup_external_epollfd, epoll_wait)
215 {
216     const ucp_datatype_t DATATYPE = ucp_dt_make_contig(1);
217     const uint64_t TAG = 0xdeadbeef;
218     void *req;
219 
220     sender().connect(&receiver(), get_ep_params());
221 
222     uint64_t send_data = 0x12121212;
223     req = ucp_tag_send_nb(sender().ep(), &send_data, sizeof(send_data), DATATYPE,
224                           TAG, send_completion);
225     if (UCS_PTR_IS_PTR(req)) {
226         wait(req);
227     } else {
228         ASSERT_UCS_OK(UCS_PTR_STATUS(req));
229     }
230 
231     uint64_t recv_data = 0;
232     req = ucp_tag_recv_nb(receiver().worker(), &recv_data, sizeof(recv_data),
233                           DATATYPE, TAG, (ucp_tag_t)-1, recv_completion);
234     while (!ucp_request_is_completed(req)) {
235 
236         ucp_worker_h recv_worker = receiver().worker();
237 
238         if (ucp_worker_progress(recv_worker)) {
239             /* Got some receive events, check request */
240             continue;
241         }
242 
243         ucs_status_t status = ucp_worker_arm(recv_worker);
244         if (status == UCS_ERR_BUSY) {
245             /* Could not arm, poll again */
246             ucp_worker_progress(recv_worker);
247             continue;
248         }
249         ASSERT_UCS_OK(status);
250 
251         struct epoll_event event;
252         int ret;
253         do {
254             ret = epoll_wait(m_epfd, &event, 1, -1);
255         } while ((ret < 0) && (errno == EINTR));
256         if (ret < 0) {
257             UCS_TEST_MESSAGE << "epoll_wait() failed: " << strerror(errno);
258         }
259         ASSERT_EQ(1, ret);
260         EXPECT_EQ(USER_DATA, event.data.ptr);
261     }
262 
263     ucp_request_release(req);
264 
265     flush_worker(sender());
266     EXPECT_EQ(send_data, recv_data);
267 }
268 
269 UCP_INSTANTIATE_TEST_CASE(test_ucp_wakeup_external_epollfd)
270