1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7
8 #include "test_ucp_tag.h"
9
10 #include <common/test_helpers.h>
11 extern "C" {
12 #include <ucp/core/ucp_request.h>
13 #include <ucp/core/ucp_types.h>
14 }
15
16 using namespace ucs; /* For vector<char> serialization */
17
18
19 class test_ucp_tag_match : public test_ucp_tag {
20 public:
test_ucp_tag_match()21 test_ucp_tag_match() {
22 // TODO: test offload and offload MP as different variants
23 enable_tag_mp_offload();
24 if (RUNNING_ON_VALGRIND) {
25 m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_SEG_SIZE", "8k"));
26 m_env.push_back(new ucs::scoped_setenv("UCX_TCP_RX_SEG_SIZE", "8k"));
27 }
28 }
29
init()30 virtual void init()
31 {
32 modify_config("TM_THRESH", "1");
33
34 test_ucp_tag::init();
35 ucp_test_param param = GetParam();
36 }
37
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)38 static std::vector<ucp_test_param> enum_test_params(const ucp_params_t& ctx_params,
39 const std::string& name,
40 const std::string& test_case_name,
41 const std::string& tls)
42 {
43 std::vector<ucp_test_param> result;
44 generate_test_params_variant(ctx_params, name, test_case_name, tls,
45 RECV_REQ_INTERNAL, result);
46 generate_test_params_variant(ctx_params, name, test_case_name, tls,
47 RECV_REQ_EXTERNAL, result);
48 return result;
49 }
50
is_external_request()51 virtual bool is_external_request()
52 {
53 return GetParam().variant == RECV_REQ_EXTERNAL;
54 }
55
56 protected:
recv_callback_release_req(void * request,ucs_status_t status,ucp_tag_recv_info_t * info)57 static void recv_callback_release_req(void *request, ucs_status_t status,
58 ucp_tag_recv_info_t *info)
59 {
60 ucp_request_free(request);
61 m_req_status = status;
62 }
63
64 static ucs_status_t m_req_status;
65 };
66
67 ucs_status_t test_ucp_tag_match::m_req_status = UCS_OK;
68
69
UCS_TEST_P(test_ucp_tag_match,send_recv_unexp)70 UCS_TEST_P(test_ucp_tag_match, send_recv_unexp) {
71 ucp_tag_recv_info_t info;
72 ucs_status_t status;
73
74 uint64_t send_data = 0xdeadbeefdeadbeef;
75 uint64_t recv_data = 0;
76
77 send_b(&send_data, sizeof(send_data), DATATYPE, 0x111337);
78
79 short_progress_loop(); /* Receive messages as unexpected */
80
81 status = recv_b(&recv_data, sizeof(recv_data), DATATYPE, 0x1337, 0xffff, &info);
82 ASSERT_UCS_OK(status);
83
84 EXPECT_EQ(sizeof(send_data), info.length);
85 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
86 EXPECT_EQ(send_data, recv_data);
87 }
88
89 UCS_TEST_SKIP_COND_P(test_ucp_tag_match, send_recv_unexp_rqfree,
90 /* request free cannot be used for external requests */
91 (GetParam().variant == RECV_REQ_EXTERNAL)) {
92 request *my_recv_req;
93 uint64_t send_data = 0xdeadbeefdeadbeef;
94 uint64_t recv_data = 0;
95
96 my_recv_req = recv_nb(&recv_data, sizeof(recv_data), DATATYPE, 0x1337, 0xffff);
97 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
98
99 request_free(my_recv_req);
100
101 send_b(&send_data, sizeof(send_data), DATATYPE, 0x1337);
102
103 wait_for_flag(&recv_data);
104 EXPECT_EQ(send_data, recv_data);
105 }
106
UCS_TEST_P(test_ucp_tag_match,send_recv_exp_medium)107 UCS_TEST_P(test_ucp_tag_match, send_recv_exp_medium) {
108 static const size_t size = 50000;
109 request *my_recv_req;
110
111 std::vector<char> sendbuf(size, 0);
112 std::vector<char> recvbuf(size, 0);
113
114 ucs::fill_random(sendbuf);
115
116 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
117 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
118 ASSERT_TRUE(my_recv_req != NULL); /* Couldn't be completed because didn't send yet */
119
120 send_b(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
121
122 wait(my_recv_req);
123
124 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
125 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
126 EXPECT_EQ(sendbuf, recvbuf);
127 request_release(my_recv_req);
128 }
129
UCS_TEST_P(test_ucp_tag_match,send2_nb_recv_exp_medium)130 UCS_TEST_P(test_ucp_tag_match, send2_nb_recv_exp_medium) {
131 static const size_t size = 50000;
132 request *my_recv_req;
133
134 std::vector<char> sendbuf(size, 0);
135 std::vector<char> recvbuf(size, 0);
136
137 /* 1st send */
138
139 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
140 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
141 ASSERT_TRUE(my_recv_req != NULL); /* Couldn't be completed because didn't send yet */
142
143 send_b(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
144
145 wait(my_recv_req);
146 request_release(my_recv_req);
147
148 /* 2nd send */
149
150 ucs::fill_random(sendbuf);
151
152 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
153 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
154 ASSERT_TRUE(my_recv_req != NULL); /* Couldn't be completed because didn't send yet */
155
156 request *my_send_req;
157 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
158 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
159
160 wait(my_recv_req);
161
162 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
163 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
164 EXPECT_EQ(sendbuf, recvbuf);
165
166 short_progress_loop();
167
168 if (my_send_req != NULL) {
169 EXPECT_TRUE(my_send_req->completed);
170 EXPECT_EQ(UCS_OK, my_send_req->status);
171 request_release(my_send_req);
172 }
173 request_release(my_recv_req);
174 }
175
176 UCS_TEST_P(test_ucp_tag_match, send2_nb_recv_medium_wildcard, "RNDV_THRESH=inf") {
177 static const size_t size = 3000000;
178
179 entity &sender2 = sender();
180 create_entity(true);
181 sender().connect(&receiver(), get_ep_params());
182
183 for (int is_exp = 0; is_exp <= 1; ++is_exp) {
184
185 UCS_TEST_MESSAGE << "Testing " << (is_exp ? "" : "un") << "expected mode, size " << size;
186
187 std::vector<char> sendbuf1(size, 0);
188 std::vector<char> sendbuf2(size, 0);
189 std::vector<char> recvbuf1(size, 0);
190 std::vector<char> recvbuf2(size, 0);
191
192 ucs::fill_random(sendbuf1);
193 ucs::fill_random(sendbuf2);
194
195 /* Two sends with different tags */
196
197 request *sreq1, *sreq2;
198 sreq1 = (request*)ucp_tag_send_nb(sender().ep(), &sendbuf1[0], sendbuf1.size(),
199 DATATYPE, 1, send_callback);
200 ASSERT_TRUE(!UCS_PTR_IS_ERR(sreq1));
201
202 sreq2 = (request*)ucp_tag_send_nb(sender2.ep(), &sendbuf2[0], sendbuf2.size(),
203 DATATYPE, 2, send_callback);
204 ASSERT_TRUE(!UCS_PTR_IS_ERR(sreq2));
205
206
207 /* In unexpected mode, we progress all to put the messages on the
208 * unexpected queue
209 */
210 if (!is_exp) {
211 short_progress_loop();
212 }
213
214 /* Two receives with any tag */
215
216 request *rreq1, *rreq2;
217
218 rreq1 = recv_nb(&recvbuf1[0], recvbuf1.size(), DATATYPE, 0, 0);
219 ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq1));
220
221 rreq2 = recv_nb(&recvbuf2[0], recvbuf2.size(), DATATYPE, 0, 0);
222 ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq2));
223
224
225 /* Wait for receives */
226 wait(rreq1);
227 wait(rreq2);
228
229 short_progress_loop();
230
231 /* Release sends */
232 if (sreq1 != NULL) {
233 wait(sreq1);
234 EXPECT_TRUE(sreq1->completed);
235 request_release(sreq1);
236 }
237 if (sreq2 != NULL) {
238 wait(sreq2);
239 EXPECT_TRUE(sreq2->completed);
240 request_release(sreq2);
241 }
242
243 /* Receives should be completed with correct length */
244 ASSERT_TRUE(rreq1->completed);
245 ASSERT_TRUE(rreq2->completed);
246
247 EXPECT_EQ(size, rreq1->info.length);
248 EXPECT_EQ(size, rreq2->info.length);
249
250 /* The order may be any, but the messages have to be received correctly */
251 if (rreq1->info.sender_tag == 1u) {
252 ASSERT_EQ(2u, rreq2->info.sender_tag);
253 EXPECT_EQ(sendbuf1, recvbuf1);
254 EXPECT_EQ(sendbuf2, recvbuf2);
255 } else {
256 ASSERT_EQ(2u, rreq1->info.sender_tag);
257 ASSERT_EQ(1u, rreq2->info.sender_tag);
258 EXPECT_EQ(sendbuf2, recvbuf1);
259 EXPECT_EQ(sendbuf1, recvbuf2);
260 }
261
262 request_release(rreq1);
263 request_release(rreq2);
264 }
265 }
266
UCS_TEST_P(test_ucp_tag_match,send_recv_nb_partial_exp_medium)267 UCS_TEST_P(test_ucp_tag_match, send_recv_nb_partial_exp_medium) {
268 static const size_t size = 50000;
269
270 std::vector<char> sendbuf(size, 0);
271 std::vector<char> recvbuf(size, 0);
272
273 ucs::fill_random(sendbuf);
274
275 request *my_recv_req;
276 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
277 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
278
279 send_b(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
280
281 usleep(1000);
282 progress();
283
284 wait(my_recv_req);
285
286 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
287 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
288 EXPECT_EQ(sendbuf, recvbuf);
289
290 request_release(my_recv_req);
291 }
292
UCS_TEST_P(test_ucp_tag_match,send_nb_recv_unexp)293 UCS_TEST_P(test_ucp_tag_match, send_nb_recv_unexp) {
294 ucp_tag_recv_info_t info;
295 ucs_status_t status;
296
297 uint64_t send_data = 0xdeadbeefdeadbeef;
298 uint64_t recv_data = 0;
299
300 request *my_send_req;
301 my_send_req = send_nb(&send_data, sizeof(send_data), DATATYPE, 0x111337);
302 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
303
304 ucp_worker_progress(receiver().worker());
305
306 status = recv_b(&recv_data, sizeof(recv_data), DATATYPE, 0x1337, 0xffff, &info);
307 ASSERT_UCS_OK(status);
308
309 EXPECT_EQ(sizeof(send_data), info.length);
310 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
311 EXPECT_EQ(send_data, recv_data);
312
313 if (my_send_req != NULL) {
314 EXPECT_TRUE(my_send_req->completed);
315 EXPECT_EQ(UCS_OK, my_send_req->status);
316 request_release(my_send_req);
317 }
318 }
319
UCS_TEST_P(test_ucp_tag_match,send_recv_cb_release)320 UCS_TEST_P(test_ucp_tag_match, send_recv_cb_release) {
321
322 uint64_t send_data = 0xdeadbeefdeadbeef;
323
324 send_b(&send_data, sizeof(send_data), DATATYPE, 0x111337);
325
326 short_progress_loop(); /* Receive messages as unexpected */
327
328 m_req_status = UCS_INPROGRESS;
329
330 uint64_t recv_data;
331 request *recv_req = (request*)ucp_tag_recv_nb(receiver().worker(), &recv_data,
332 sizeof(recv_data), DATATYPE, 0, 0,
333 recv_callback_release_req);
334 if (UCS_PTR_IS_ERR(recv_req)) {
335 ASSERT_UCS_OK(UCS_PTR_STATUS(recv_req));
336 } else if (recv_req == NULL) {
337 UCS_TEST_ABORT("ucp_tag_recv_nb returned NULL");
338 } else {
339 /* request would be completed and released by the callback */
340 while (m_req_status == UCS_INPROGRESS) {
341 progress();
342 }
343 ASSERT_UCS_OK(m_req_status);
344 }
345 }
346
UCS_TEST_P(test_ucp_tag_match,send_recv_truncated)347 UCS_TEST_P(test_ucp_tag_match, send_recv_truncated) {
348 ucp_tag_recv_info_t info;
349 ucs_status_t status;
350
351 uint64_t send_data = 0xdeadbeefdeadbeef;
352
353 send_b(&send_data, sizeof(send_data), DATATYPE, 0x111337);
354
355 short_progress_loop(); /* Receive messages as unexpected */
356
357 status = recv_b(NULL, 0, DATATYPE, 0x1337, 0xffff, &info);
358 EXPECT_EQ(UCS_ERR_MESSAGE_TRUNCATED, status);
359 }
360
UCS_TEST_P(test_ucp_tag_match,send_recv_nb_exp)361 UCS_TEST_P(test_ucp_tag_match, send_recv_nb_exp) {
362
363 uint64_t send_data = 0xdeadbeefdeadbeef;
364 uint64_t recv_data = 0;
365
366 request *my_recv_req;
367 my_recv_req = recv_nb(&recv_data, sizeof(recv_data), DATATYPE, 0x1337, 0xffff);
368
369 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
370 ASSERT_TRUE(my_recv_req != NULL); /* Couldn't be completed because didn't send yet */
371
372 send_b(&send_data, sizeof(send_data), DATATYPE, 0x111337);
373
374 wait(my_recv_req);
375
376 EXPECT_TRUE(my_recv_req->completed);
377 EXPECT_EQ(UCS_OK, my_recv_req->status);
378 EXPECT_EQ(sizeof(send_data), my_recv_req->info.length);
379 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
380 EXPECT_EQ(send_data, recv_data);
381 request_release(my_recv_req);
382 }
383
UCS_TEST_P(test_ucp_tag_match,send_nb_multiple_recv_unexp)384 UCS_TEST_P(test_ucp_tag_match, send_nb_multiple_recv_unexp) {
385 const unsigned num_requests = 1000;
386 ucp_tag_recv_info_t info;
387 ucs_status_t status;
388
389 uint64_t send_data = 0xdeadbeefdeadbeef;
390 uint64_t recv_data = 0;
391
392 std::vector<request*> send_reqs(num_requests);
393
394 skip_loopback();
395
396 for (unsigned i = 0; i < num_requests; ++i) {
397 send_reqs[i] = send_nb(&send_data, sizeof(send_data), DATATYPE, 0x111337);
398 ASSERT_TRUE(!UCS_PTR_IS_ERR(send_reqs[i]));
399 }
400
401 ucp_worker_progress(receiver().worker());
402
403 for (unsigned i = 0; i < num_requests; ++i) {
404 status = recv_b(&recv_data, sizeof(recv_data), DATATYPE, 0x1337, 0xffff,
405 &info);
406 ASSERT_UCS_OK(status);
407 ASSERT_EQ(num_requests, send_reqs.size());
408
409 EXPECT_EQ(sizeof(send_data), info.length);
410 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
411 EXPECT_EQ(send_data, recv_data);
412 }
413
414 for (unsigned i = 0; i < num_requests; ++i) {
415 if (send_reqs[i] != NULL) {
416 EXPECT_TRUE(send_reqs[i]->completed);
417 EXPECT_EQ(UCS_OK, send_reqs[i]->status);
418 request_release(send_reqs[i]);
419 }
420 }
421 }
422
UCS_TEST_P(test_ucp_tag_match,sync_send_unexp)423 UCS_TEST_P(test_ucp_tag_match, sync_send_unexp) {
424 ucp_tag_recv_info_t info;
425 ucs_status_t status;
426
427 uint64_t send_data = 0x0102030405060708;
428 uint64_t recv_data = 0;
429
430 request *my_send_req = send_sync_nb(&send_data, sizeof(send_data), DATATYPE,
431 0x111337);
432 short_progress_loop();
433
434 ASSERT_TRUE(my_send_req != NULL);
435 EXPECT_FALSE(my_send_req->completed);
436
437 ucp_worker_progress(receiver().worker());
438
439 status = recv_b(&recv_data, sizeof(recv_data), DATATYPE, 0x1337, 0xffff, &info);
440 ASSERT_UCS_OK(status);
441
442 EXPECT_EQ(sizeof(send_data), info.length);
443 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
444 EXPECT_EQ(send_data, recv_data);
445
446 short_progress_loop();
447
448 EXPECT_TRUE(my_send_req->completed);
449 EXPECT_EQ(UCS_OK, my_send_req->status);
450 request_release(my_send_req);
451 }
452
453 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_match)
454
455 class test_ucp_tag_match_rndv : public test_ucp_tag_match {
456 public:
457 enum {
458 RNDV_SCHEME_AUTO = 0,
459 RNDV_SCHEME_PUT_ZCOPY,
460 RNDV_SCHEME_GET_ZCOPY
461 };
462
463 static const std::string rndv_schemes[];
464
init()465 void init() {
466 ASSERT_LE(GetParam().variant, (int)RNDV_SCHEME_GET_ZCOPY);
467 modify_config("RNDV_SCHEME", rndv_schemes[GetParam().variant]);
468
469 test_ucp_tag_match::init();
470 }
471
472 std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)473 static enum_test_params(const ucp_params_t& ctx_params,
474 const std::string& name,
475 const std::string& test_case_name,
476 const std::string& tls)
477 {
478 std::vector<ucp_test_param> result;
479 generate_test_params_variant(ctx_params, name,
480 test_case_name + "/rndv_" +
481 rndv_schemes[RNDV_SCHEME_AUTO],
482 tls, RNDV_SCHEME_AUTO, result);
483 generate_test_params_variant(ctx_params, name,
484 test_case_name + "/rndv_" +
485 rndv_schemes[RNDV_SCHEME_PUT_ZCOPY],
486 tls, RNDV_SCHEME_PUT_ZCOPY, result);
487 generate_test_params_variant(ctx_params, name,
488 test_case_name + "/rndv_" +
489 rndv_schemes[RNDV_SCHEME_GET_ZCOPY],
490 tls, RNDV_SCHEME_GET_ZCOPY, result);
491 return result;
492 }
493 };
494
495 const std::string test_ucp_tag_match_rndv::rndv_schemes[] = { "auto",
496 "put_zcopy",
497 "get_zcopy" };
498
499 UCS_TEST_P(test_ucp_tag_match_rndv, sync_send_unexp, "RNDV_THRESH=1048576") {
500 static const size_t size = 1148576;
501 request *my_send_req;
502 ucp_tag_recv_info_t info;
503 ucs_status_t status;
504
505 std::vector<char> sendbuf(size, 0);
506 std::vector<char> recvbuf(size, 0);
507
508 ucs::fill_random(sendbuf);
509
510 /* sender - send the rts*/
511 my_send_req = send_sync_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
512 /* receiver - get the rts and put in unexpected */
513 short_progress_loop();
514
515 ASSERT_TRUE(my_send_req != NULL);
516 EXPECT_FALSE(my_send_req->completed);
517
518 /* receiver - issue a recv req, match the rts, perform rndv-get and send ats to sender */
519 status = recv_b(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff, &info);
520 ASSERT_UCS_OK(status);
521
522 EXPECT_EQ(sendbuf.size(), info.length);
523 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
524 EXPECT_EQ(sendbuf, recvbuf);
525
526 /* sender - get the ATS and set send request to completed */
527 wait_for_flag(&my_send_req->completed);
528
529 EXPECT_TRUE(my_send_req->completed);
530 EXPECT_EQ(UCS_OK, my_send_req->status);
531 request_release(my_send_req);
532 }
533
534 UCS_TEST_P(test_ucp_tag_match_rndv, req_exp, "RNDV_THRESH=1048576") {
535 static const size_t size = 1148576;
536 request *my_send_req, *my_recv_req;
537
538 std::vector<char> sendbuf(size, 0);
539 std::vector<char> recvbuf(size, 0);
540
541 skip_loopback();
542
543 ucs::fill_random(sendbuf);
544
545 /* receiver - put the receive request into expected */
546 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
547 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
548 EXPECT_FALSE(my_recv_req->completed);
549
550 /* sender - send the RTS */
551 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
552 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
553
554 /* receiver - match the rts, perform rndv get and send an ack upon finishing */
555 short_progress_loop();
556 /* for UCTs that cannot perform real rndv and may do eager send-recv bcopy instead */
557 wait(my_recv_req);
558
559 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
560 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
561 EXPECT_TRUE(my_recv_req->completed);
562 EXPECT_EQ(sendbuf, recvbuf);
563
564 wait_and_validate(my_send_req);
565 request_release(my_recv_req);
566 }
567
568 UCS_TEST_P(test_ucp_tag_match_rndv, rts_unexp, "RNDV_THRESH=1048576") {
569 static const size_t size = 1148576;
570 request *my_send_req;
571 ucp_tag_recv_info_t info;
572 ucs_status_t status;
573
574 std::vector<char> sendbuf(size, 0);
575 std::vector<char> recvbuf(size, 0);
576
577 skip_loopback();
578
579 ucs::fill_random(sendbuf);
580
581 /* sender - send the RTS */
582 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
583 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
584
585 /* receiver - get the RTS and put it into unexpected */
586 short_progress_loop();
587
588 /* receiver - issue a receive request, match it with the RTS and perform rndv get */
589 status = recv_b(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff, &info);
590 ASSERT_UCS_OK(status);
591
592 /* sender - get the ATS and set send request to completed */
593 wait_and_validate(my_send_req);
594
595 EXPECT_EQ(sendbuf.size() , info.length);
596 EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
597 EXPECT_EQ(sendbuf, recvbuf);
598 }
599
600 UCS_TEST_P(test_ucp_tag_match_rndv, truncated, "RNDV_THRESH=1048576") {
601 static const size_t size = 1148576;
602 request *my_send_req;
603 ucp_tag_recv_info_t info;
604 ucs_status_t status;
605
606 std::vector<char> sendbuf(size, 0);
607
608 skip_loopback();
609
610 ucs::fill_random(sendbuf);
611
612 /* sender - send the RTS */
613 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
614 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
615
616 /* receiver - get the RTS and put it into unexpected */
617 short_progress_loop();
618
619 /* receiver - issue a receive request with zero length,
620 * no assertions should occur */
621 status = recv_b(NULL, 0, DATATYPE, 0x1337, 0xffff, &info);
622 EXPECT_EQ(UCS_ERR_MESSAGE_TRUNCATED, status);
623
624 /* sender - get the ATS and set send request to completed */
625 wait_and_validate(my_send_req);
626 }
627
628 UCS_TEST_P(test_ucp_tag_match_rndv, post_larger_recv, "RNDV_THRESH=0") {
629 /* small send size should probably be lower than minimum GET Zcopy
630 * size supported by IB TLs */
631 static const size_t small_send_size = 16;
632 static const size_t small_recv_size = small_send_size * 2;
633 static const size_t large_send_size = 1148576;
634 static const size_t large_recv_size = large_send_size + 1 * UCS_KBYTE;
635 /* array of [send][recv] sizes */
636 static const size_t sizes[][2] = { { small_send_size, small_recv_size },
637 { large_send_size, large_recv_size } };
638 request *my_send_req, *my_recv_req;
639
640 for (unsigned i = 0; i < ucs_array_size(sizes); i++) {
641 size_t send_size = sizes[i][0];
642 size_t recv_size = sizes[i][1];
643 std::vector<char> sendbuf(send_size, 0);
644 std::vector<char> recvbuf(recv_size, 0);
645
646 ucs::fill_random(sendbuf);
647 ucs::fill_random(recvbuf);
648
649 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
650 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
651 EXPECT_FALSE(my_recv_req->completed);
652
653 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
654 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
655
656 wait(my_recv_req);
657
658 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
659 EXPECT_EQ(recvbuf.size(), ((ucp_request_t*)my_recv_req - 1)->recv.length);
660 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
661 EXPECT_TRUE(my_recv_req->completed);
662 EXPECT_NE(sendbuf, recvbuf);
663 EXPECT_TRUE(std::equal(sendbuf.begin(), sendbuf.end(), recvbuf.begin()));
664
665 wait_and_validate(my_send_req);
666 request_release(my_recv_req);
667 }
668 }
669
670 UCS_TEST_P(test_ucp_tag_match_rndv, req_exp_auto_thresh, "RNDV_THRESH=auto") {
671 static const size_t size = 1148576;
672 request *my_send_req, *my_recv_req;
673
674 std::vector<char> sendbuf(size, 0);
675 std::vector<char> recvbuf(size, 0);
676
677 skip_loopback();
678
679 ucs::fill_random(sendbuf);
680
681 /* receiver - put the receive request into expected */
682 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
683 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
684 EXPECT_FALSE(my_recv_req->completed);
685
686 /* sender - send the RTS */
687 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
688 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
689
690 /* receiver - match the rts, perform rndv get and send an ack upon finishing */
691 short_progress_loop();
692 /* for UCTs that cannot perform real rndv and may do eager send-recv bcopy instead */
693 wait(my_recv_req);
694
695 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
696 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
697 EXPECT_TRUE(my_recv_req->completed);
698 EXPECT_EQ(sendbuf, recvbuf);
699
700 /* sender - get the ATS and set send request to completed */
701 wait_and_validate(my_send_req);
702 request_release(my_recv_req);
703 }
704
UCS_TEST_P(test_ucp_tag_match_rndv,exp_huge_mix)705 UCS_TEST_P(test_ucp_tag_match_rndv, exp_huge_mix) {
706 const size_t sizes[] = { 1000, 2000, 8000, 2500ul * UCS_MBYTE };
707
708 /* small sizes should warm-up tag cache */
709 for (unsigned i = 0; i < ucs_array_size(sizes); ++i) {
710 const size_t size = sizes[i] / ucs::test_time_multiplier();
711 request *my_send_req, *my_recv_req;
712
713 std::vector<char> sendbuf(size, 0);
714 std::vector<char> recvbuf(size, 0);
715
716 ucs::fill_random(sendbuf);
717
718 my_recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE, 0x1337, 0xffff);
719 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
720 EXPECT_FALSE(my_recv_req->completed);
721
722 my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
723 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
724
725 wait(my_recv_req);
726
727 EXPECT_EQ(sendbuf.size(), my_recv_req->info.length);
728 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
729 EXPECT_TRUE(my_recv_req->completed);
730 EXPECT_EQ(sendbuf, recvbuf);
731
732 wait_and_validate(my_send_req);
733 request_free(my_recv_req);
734 }
735 }
736
737 UCS_TEST_P(test_ucp_tag_match_rndv, bidir_multi_exp_post, "RNDV_THRESH=0") {
738 const size_t sizes[] = { 8 * UCS_KBYTE, 128 * UCS_KBYTE, 512 * UCS_KBYTE,
739 8 * UCS_MBYTE, 128 * UCS_MBYTE, 512 * UCS_MBYTE };
740
741 receiver().connect(&sender(), get_ep_params());
742
743 for (unsigned i = 0; i < ucs_array_size(sizes); ++i) {
744 const size_t size = sizes[i] /
745 ucs::test_time_multiplier() /
746 ucs::test_time_multiplier();
747 const size_t count = ucs_max((size_t)(5000.0 / sqrt(sizes[i]) /
748 ucs::test_time_multiplier()), 3lu);
749 std::vector<request*> sreqs;
750 std::vector<request*> rreqs;
751 std::vector<std::vector<char> > sbufs;
752 std::vector<std::vector<char> > rbufs;
753
754 sbufs.resize(count * 2);
755 rbufs.resize(count * 2);
756
757 for (size_t repeat = 0; repeat < count * 2; ++repeat) {
758 entity &send_e = repeat < count ? sender() : receiver();
759 entity &recv_e = repeat < count ? receiver() : sender();
760 request *my_send_req, *my_recv_req;
761
762 sbufs[repeat].resize(size, 0);
763 rbufs[repeat].resize(size, 0);
764 ucs::fill_random(sbufs[repeat]);
765
766 my_recv_req = recv(recv_e, RECV_NB,
767 &rbufs[repeat][0], rbufs[repeat].size(),
768 DATATYPE, 0x1337, 0xffff, NULL);
769 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
770 EXPECT_FALSE(my_recv_req->completed);
771
772 my_send_req = send(send_e, SEND_NB,
773 &sbufs[repeat][0], sbufs[repeat].size(),
774 DATATYPE, 0x111337);
775 ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
776
777 sreqs.push_back(my_send_req);
778 rreqs.push_back(my_recv_req);
779 }
780
781 for (size_t repeat = 0; repeat < count * 2; ++repeat) {
782 request *my_send_req, *my_recv_req;
783
784 my_recv_req = rreqs[repeat];
785 my_send_req = sreqs[repeat];
786
787 wait(my_recv_req);
788
789 EXPECT_EQ(sbufs[repeat].size(), my_recv_req->info.length);
790 EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
791 EXPECT_TRUE(my_recv_req->completed);
792 EXPECT_EQ(sbufs[repeat], rbufs[repeat]);
793
794 wait_and_validate(my_send_req);
795 request_free(my_recv_req);
796 }
797 }
798 }
799
800 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_match_rndv)
801