1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "ucp_test.h"
8
9 #include <algorithm>
10 #include <sys/epoll.h>
11 #include <sys/poll.h>
12
13
14 class test_ucp_wakeup : public ucp_test {
15 public:
get_ctx_params()16 static ucp_params_t get_ctx_params() {
17 ucp_params_t params = ucp_test::get_ctx_params();
18 params.features |= UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP;
19 return params;
20 }
21
22 protected:
send_completion(void * request,ucs_status_t status)23 static void send_completion(void *request, ucs_status_t status) {
24 ++comp_cntr;
25 }
26
recv_completion(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)27 static void recv_completion(void *request, ucs_status_t status,
28 ucp_tag_recv_info_t *info) {
29 ++comp_cntr;
30 }
31
wait(void * req)32 void wait(void *req) {
33 do {
34 progress();
35 } while (!ucp_request_is_completed(req));
36 ucp_request_release(req);
37 }
38
arm(ucp_worker_h worker)39 void arm(ucp_worker_h worker) {
40 ucs_status_t status;
41 do {
42 status = ucp_worker_arm(worker);
43 } while (UCS_ERR_BUSY == status);
44 ASSERT_EQ(UCS_OK, status);
45 }
46
47 static size_t comp_cntr;
48 };
49
50 size_t test_ucp_wakeup::comp_cntr = 0;
51
UCS_TEST_P(test_ucp_wakeup,efd)52 UCS_TEST_P(test_ucp_wakeup, efd)
53 {
54 const ucp_datatype_t DATATYPE = ucp_dt_make_contig(1);
55 const uint64_t TAG = 0xdeadbeef;
56 ucp_worker_h recv_worker;
57 int recv_efd;
58 void *req;
59
60 sender().connect(&receiver(), get_ep_params());
61
62 recv_worker = receiver().worker();
63 ASSERT_UCS_OK(ucp_worker_get_efd(recv_worker, &recv_efd));
64
65 uint64_t send_data = 0x12121212;
66 req = ucp_tag_send_nb(sender().ep(), &send_data, sizeof(send_data), DATATYPE,
67 TAG, send_completion);
68 if (UCS_PTR_IS_PTR(req)) {
69 wait(req);
70 } else {
71 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
72 }
73
74 uint64_t recv_data = 0;
75 req = ucp_tag_recv_nb(receiver().worker(), &recv_data, sizeof(recv_data),
76 DATATYPE, TAG, (ucp_tag_t)-1, recv_completion);
77 while (!ucp_request_is_completed(req)) {
78
79 if (ucp_worker_progress(recv_worker)) {
80 /* Got some receive events, check request */
81 continue;
82 }
83
84 ucs_status_t status = ucp_worker_arm(recv_worker);
85 if (status == UCS_ERR_BUSY) {
86 /* Could not arm, poll again */
87 ucp_worker_progress(recv_worker);
88 continue;
89 }
90 ASSERT_UCS_OK(status);
91
92 int ret;
93 do {
94 struct pollfd pollfd;
95 pollfd.events = POLLIN;
96 pollfd.fd = recv_efd;
97 ret = poll(&pollfd, 1, -1);
98 } while ((ret < 0) && (errno == EINTR));
99 if (ret < 0) {
100 UCS_TEST_MESSAGE << "poll() failed: " << strerror(errno);
101 }
102 ASSERT_EQ(1, ret);
103 EXPECT_EQ(UCS_ERR_BUSY, ucp_worker_arm(recv_worker));
104 }
105
106 ucp_request_release(req);
107
108 flush_worker(sender());
109 EXPECT_EQ(send_data, recv_data);
110 }
111
112 /* This test doesn't progress receiver's worker, while
113 * waiting for the events on a sender's worker fd. So,
114 * this causes the hang due to lack of the progress during
115 * TCP CM message exchange (TCP doesn't have an async progress
116 * for such events)
117 * TODO: add async progress for TCP connections */
118 UCS_TEST_SKIP_COND_P(test_ucp_wakeup, tx_wait,
119 has_transport("tcp"), "ZCOPY_THRESH=10000")
120 {
121 const ucp_datatype_t DATATYPE = ucp_dt_make_contig(1);
122 const size_t COUNT = 20000;
123 const uint64_t TAG = 0xdeadbeef;
124 std::string send_data(COUNT, '2'), recv_data(COUNT, '1');
125 void *sreq, *rreq;
126
127 sender().connect(&receiver(), get_ep_params());
128
129 rreq = ucp_tag_recv_nb(receiver().worker(), &recv_data[0], COUNT, DATATYPE,
130 TAG, (ucp_tag_t)-1, recv_completion);
131
132 sreq = ucp_tag_send_nb(sender().ep(), &send_data[0], COUNT, DATATYPE, TAG,
133 send_completion);
134
135 if (UCS_PTR_IS_PTR(sreq)) {
136 /* wait for send completion */
137 do {
138 ucp_worker_wait(sender().worker());
139 while (progress());
140 } while (!ucp_request_is_completed(sreq));
141 ucp_request_release(sreq);
142 } else {
143 ASSERT_UCS_OK(UCS_PTR_STATUS(sreq));
144 }
145
146 wait(rreq);
147
148 EXPECT_EQ(send_data, recv_data);
149 }
150
UCS_TEST_P(test_ucp_wakeup,signal)151 UCS_TEST_P(test_ucp_wakeup, signal)
152 {
153 int efd;
154 ucp_worker_h worker;
155 struct pollfd polled;
156
157 polled.events = POLLIN;
158
159 worker = sender().worker();
160 ASSERT_UCS_OK(ucp_worker_get_efd(worker, &efd));
161
162 polled.fd = efd;
163 EXPECT_EQ(0, poll(&polled, 1, 0));
164 arm(worker);
165 ASSERT_UCS_OK(ucp_worker_signal(worker));
166 EXPECT_EQ(1, poll(&polled, 1, 0));
167 arm(worker);
168 EXPECT_EQ(0, poll(&polled, 1, 0));
169
170 ASSERT_UCS_OK(ucp_worker_signal(worker));
171 ASSERT_UCS_OK(ucp_worker_signal(worker));
172 EXPECT_EQ(1, poll(&polled, 1, 0));
173 arm(worker);
174 EXPECT_EQ(0, poll(&polled, 1, 0));
175
176 ASSERT_UCS_OK(ucp_worker_signal(worker));
177 EXPECT_EQ(UCS_ERR_BUSY, ucp_worker_arm(worker));
178 EXPECT_EQ(UCS_OK, ucp_worker_arm(worker));
179 }
180
181 UCP_INSTANTIATE_TEST_CASE(test_ucp_wakeup)
182
183 class test_ucp_wakeup_external_epollfd : public test_ucp_wakeup {
184 public:
get_worker_params()185 virtual ucp_worker_params_t get_worker_params() {
186 ucp_worker_params_t params = test_ucp_wakeup::get_worker_params();
187 params.field_mask |= UCP_WORKER_PARAM_FIELD_EVENT_FD |
188 UCP_WORKER_PARAM_FIELD_USER_DATA;
189 params.event_fd = m_epfd;
190 params.user_data = USER_DATA;
191 return params;
192 }
193
194 protected:
195 static void* const USER_DATA;
196
init()197 virtual void init() {
198 m_epfd = epoll_create(1);
199 ASSERT_GE(m_epfd, 0);
200 test_ucp_wakeup::init();
201 }
202
cleanup()203 virtual void cleanup() {
204 test_ucp_wakeup::cleanup();
205 close(m_epfd);
206 }
207
208 int m_epfd;
209 };
210
211 void* const test_ucp_wakeup_external_epollfd::USER_DATA = (void*)0x1337abcdef;
212
213
UCS_TEST_P(test_ucp_wakeup_external_epollfd,epoll_wait)214 UCS_TEST_P(test_ucp_wakeup_external_epollfd, epoll_wait)
215 {
216 const ucp_datatype_t DATATYPE = ucp_dt_make_contig(1);
217 const uint64_t TAG = 0xdeadbeef;
218 void *req;
219
220 sender().connect(&receiver(), get_ep_params());
221
222 uint64_t send_data = 0x12121212;
223 req = ucp_tag_send_nb(sender().ep(), &send_data, sizeof(send_data), DATATYPE,
224 TAG, send_completion);
225 if (UCS_PTR_IS_PTR(req)) {
226 wait(req);
227 } else {
228 ASSERT_UCS_OK(UCS_PTR_STATUS(req));
229 }
230
231 uint64_t recv_data = 0;
232 req = ucp_tag_recv_nb(receiver().worker(), &recv_data, sizeof(recv_data),
233 DATATYPE, TAG, (ucp_tag_t)-1, recv_completion);
234 while (!ucp_request_is_completed(req)) {
235
236 ucp_worker_h recv_worker = receiver().worker();
237
238 if (ucp_worker_progress(recv_worker)) {
239 /* Got some receive events, check request */
240 continue;
241 }
242
243 ucs_status_t status = ucp_worker_arm(recv_worker);
244 if (status == UCS_ERR_BUSY) {
245 /* Could not arm, poll again */
246 ucp_worker_progress(recv_worker);
247 continue;
248 }
249 ASSERT_UCS_OK(status);
250
251 struct epoll_event event;
252 int ret;
253 do {
254 ret = epoll_wait(m_epfd, &event, 1, -1);
255 } while ((ret < 0) && (errno == EINTR));
256 if (ret < 0) {
257 UCS_TEST_MESSAGE << "epoll_wait() failed: " << strerror(errno);
258 }
259 ASSERT_EQ(1, ret);
260 EXPECT_EQ(USER_DATA, event.data.ptr);
261 }
262
263 ucp_request_release(req);
264
265 flush_worker(sender());
266 EXPECT_EQ(send_data, recv_data);
267 }
268
269 UCP_INSTANTIATE_TEST_CASE(test_ucp_wakeup_external_epollfd)
270