1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7
8 #include "test_ucp_tag.h"
9
10 #include <common/test_helpers.h>
11
12
13 class test_ucp_tag_probe : public test_ucp_tag {
14 public:
test_ucp_tag_probe()15 test_ucp_tag_probe() {
16 if (has_transport("tcp")) {
17 /* Decrease `TX_SEG_SIZE` and `RX_SEG_SIZE` parameters
18 * for TCP transport to be able fully consume receive
19 * buffer by 100-byte messages */
20 m_env.push_back(new ucs::scoped_setenv("UCX_TCP_TX_SEG_SIZE", "4k"));
21 m_env.push_back(new ucs::scoped_setenv("UCX_TCP_RX_SEG_SIZE", "4k"));
22 }
23 }
24
25 /* The parameters mean the following:
26 * - s_size and r_size: send and recv buffer sizes.
27 * Can be different for checking message transaction error
28 * - is_sync: specifies the type of send function to be used
29 * (sync or not)
30 * - is_recv_msg: specifies whether probe function needs to remove
31 * matched message. If yes, then ucp_tag_msg_recv_nb is used for
32 * receive
33 * */
test_send_probe(size_t s_size,size_t r_size,bool is_sync,int is_recv_msg)34 void test_send_probe (size_t s_size, size_t r_size, bool is_sync,
35 int is_recv_msg) {
36 ucp_tag_recv_info_t info;
37 ucp_tag_message_h message;
38 request *send_req = NULL;
39 request *recv_req = NULL;
40
41 std::vector<char> sendbuf(s_size, 0);
42 std::vector<char> recvbuf(r_size, 0);
43
44 ucs::fill_random(sendbuf);
45
46 message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff,
47 is_recv_msg, &info);
48 EXPECT_TRUE(message == NULL);
49
50 if (is_sync) {
51 send_req = send_sync_nb(&sendbuf[0], sendbuf.size(), DATATYPE,
52 0x111337);
53
54 } else {
55 send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
56 }
57
58 do {
59 progress();
60 message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff,
61 is_recv_msg, &info);
62 } while (message == NULL);
63
64 EXPECT_EQ(sendbuf.size(), info.length);
65 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
66
67 if (is_recv_msg == 0) {
68 recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE,
69 0x1337, 0xffff);
70 } else {
71 recv_req = (request*)ucp_tag_msg_recv_nb(receiver().worker(),
72 &recvbuf[0],recvbuf.size(),
73 DATATYPE, message, recv_callback);
74 ASSERT_TRUE(!UCS_PTR_IS_ERR(recv_req));
75 }
76
77 wait(recv_req);
78 EXPECT_TRUE(recv_req->completed);
79 if (s_size != r_size) {
80 /* Test for correct msg transaction handling */
81 EXPECT_EQ(UCS_ERR_MESSAGE_TRUNCATED, recv_req->status);
82 } else {
83 /* Everything should be received correctly */
84 EXPECT_EQ(UCS_OK, recv_req->status);
85 EXPECT_EQ(sendbuf.size(), recv_req->info.length);
86 EXPECT_EQ((ucp_tag_t)0x111337, recv_req->info.sender_tag);
87 EXPECT_EQ(sendbuf, recvbuf);
88 }
89 request_release(recv_req);
90
91 if (UCS_PTR_IS_PTR(send_req)) {
92 wait(send_req);
93 EXPECT_TRUE(send_req->completed);
94 EXPECT_EQ(UCS_OK, send_req->status);
95 request_release(send_req);
96 }
97 }
98
probe_all(std::string & recvbuf)99 int probe_all(std::string &recvbuf)
100 {
101 ucp_tag_message_h message;
102 ucp_tag_recv_info_t info;
103 request *req;
104
105 int count = 0;
106 for (;;) {
107 message = ucp_tag_probe_nb(receiver().worker(), 0, 0, 1, &info);
108 if (message == NULL) {
109 return count;
110 }
111
112 req = (request*)ucp_tag_msg_recv_nb(receiver().worker(),
113 &recvbuf[0], recvbuf.size(),
114 DATATYPE, message, recv_callback);
115 wait(req);
116 request_release(req);
117 ++count;
118 }
119 }
120 };
121
122
UCS_TEST_P(test_ucp_tag_probe,send_probe)123 UCS_TEST_P(test_ucp_tag_probe, send_probe) {
124 test_send_probe (8, 8, false, 0);
125 test_send_probe (8, 8, true, 0);
126 }
127
128 UCS_TEST_P(test_ucp_tag_probe, send_medium_msg_probe, "RNDV_THRESH=1048576") {
129 test_send_probe (50000, 50000, false, 1);
130 test_send_probe (50000, 50000, true, 1);
131 }
132
133 UCS_TEST_P(test_ucp_tag_probe, send_medium_msg_probe_truncated, "RNDV_THRESH=1048576") {
134 test_send_probe (50000, 0, false, 1);
135 test_send_probe (50000, 0, true, 1);
136 }
137
138 UCS_TEST_P(test_ucp_tag_probe, send_rndv_msg_probe, "RNDV_THRESH=1048576") {
139 static const size_t size = 1148576;
140 ucp_tag_recv_info_t info;
141 ucp_tag_message_h message;
142 request *my_send_req, *my_recv_req;
143
144 skip_loopback();
145
146 std::vector<char> sendbuf(size, 0);
147 std::vector<char> recvbuf(size, 0);
148
149 ucs::fill_random(sendbuf);
150
151 message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info);
152 EXPECT_TRUE(message == NULL);
153
154 /* sender - send the RTS */
155 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
156 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
157
158 /* receiver - get the RTS and put it into unexpected */
159 wait_for_unexpected_msg(receiver().worker(), 10.0);
160
161 /* receiver - match the rts, remove it from unexpected and return it */
162 message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info);
163 /* make sure that there was a match (RTS) */
164 ASSERT_TRUE(message != NULL);
165 EXPECT_EQ(sendbuf.size(), info.length);
166 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
167
168 /* receiver - process the rts and schedule a get operation */
169 my_recv_req = (request*)ucp_tag_msg_recv_nb(receiver().worker(), &recvbuf[0],
170 recvbuf.size(), DATATYPE, message,
171 recv_callback);
172 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
173
174 /* receiver - perform rndv get and send the ATS */
175 wait(my_recv_req);
176 EXPECT_TRUE(my_recv_req->completed);
177
178 /* sender - get the ATS and set send request to completed */
179 short_progress_loop();
180
181 EXPECT_EQ(UCS_OK, my_recv_req->status);
182 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
183 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
184 EXPECT_EQ(sendbuf, recvbuf);
185
186 wait_and_validate(my_send_req);
187 request_release(my_recv_req);
188 }
189
190 UCS_TEST_P(test_ucp_tag_probe, send_2_msg_probe, "RNDV_THRESH=inf") {
191 const ucp_datatype_t DT_INT = ucp_dt_make_contig(sizeof(int));
192 const ucp_tag_t TAG = 0xaaa;
193 const size_t COUNT = 20000;
194 std::vector<request*> reqs;
195
196 /*
197 * send in order: 1, 2
198 */
199 std::vector<int> sdata1(COUNT, 1);
200 std::vector<int> sdata2(COUNT, 2);
201 request *sreq1 = send_nb(&sdata1[0], COUNT, DT_INT, TAG);
202 if (sreq1 != NULL) {
203 reqs.push_back(sreq1);
204 }
205 request *sreq2 = send_nb(&sdata2[0], COUNT, DT_INT, TAG);
206 if (sreq2 != NULL) {
207 reqs.push_back(sreq2);
208 }
209
210 /*
211 * probe in order: 1, 2
212 */
213 ucp_tag_message_h message1, message2;
214 ucp_tag_recv_info_t info;
215 do {
216 progress();
217 message1 = ucp_tag_probe_nb(receiver().worker(), TAG, 0xffff, 1, &info);
218 } while (message1 == NULL);
219 do {
220 progress();
221 message2 = ucp_tag_probe_nb(receiver().worker(), TAG, 0xffff, 1, &info);
222 } while (message2 == NULL);
223
224 /*
225 * receive in **reverse** order: 2, 1
226 */
227 std::vector<int> rdata2(COUNT);
228 request *rreq2 = (request*)ucp_tag_msg_recv_nb(receiver().worker(), &rdata2[0],
229 COUNT, DT_INT, message2,
230 recv_callback);
231 reqs.push_back(rreq2);
232 ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq2));
233 wait(rreq2);
234
235 std::vector<int> rdata1(COUNT);
236 request *rreq1 = (request*)ucp_tag_msg_recv_nb(receiver().worker(), &rdata1[0],
237 COUNT, DT_INT, message1,
238 recv_callback);
239 reqs.push_back(rreq1);
240 ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq1));
241 wait(rreq1);
242
243 if (sreq1 != NULL) {
244 wait(sreq1);
245 }
246 if (sreq2 != NULL) {
247 wait(sreq2);
248 }
249
250 /*
251 * expect data to arrive in probe order (rather than recv order)
252 */
253 EXPECT_EQ(sdata1, rdata1);
254 EXPECT_EQ(sdata2, rdata2);
255 while (!reqs.empty()) {
256 request *req = reqs.back();
257 EXPECT_TRUE(req->completed);
258 EXPECT_EQ(UCS_OK, req->status);
259 request_release(req);
260 reqs.pop_back();
261 }
262 }
263
UCS_TEST_P(test_ucp_tag_probe,limited_probe_size)264 UCS_TEST_P(test_ucp_tag_probe, limited_probe_size) {
265 static const int COUNT = 1000;
266 std::string sendbuf, recvbuf;
267 std::vector<request*> reqs;
268 ucp_tag_recv_info_t info;
269 request *req;
270 int recvd;
271
272 skip_loopback();
273
274 sendbuf.resize(100, '1');
275 recvbuf.resize(100, '0');
276
277 send_b(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
278 recv_b(&recvbuf[0], recvbuf.size(), DATATYPE, 0x111337, 0xffffff, &info);
279
280 /* send 1000 messages without calling progress */
281 for (int i = 0; i < COUNT; ++i) {
282 req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
283 if (req != NULL) {
284 reqs.push_back(req);
285 }
286
287 sender().progress(); /* progress only the sender */
288 }
289
290 for (int i = 0; i < 1000; ++i) {
291 ucs::safe_usleep(1000);
292 sender().progress();
293 }
294
295 /* progress once */
296 ucp_worker_progress(receiver().worker());
297
298 /* probe should not have too many messages here because we poll once */
299 recvd = probe_all(recvbuf);
300 EXPECT_LE(recvd, 128);
301
302 /* receive all the rest */
303 while (recvd < COUNT) {
304 progress();
305 recvd += probe_all(recvbuf);
306 }
307
308 while (!reqs.empty()) {
309 wait(reqs.back());
310 request_release(reqs.back());
311 reqs.pop_back();
312 }
313 }
314 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_probe)
315