1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7 
8 #include "test_ucp_tag.h"
9 
10 #include <common/test_helpers.h>
11 
12 
13 class test_ucp_tag_probe : public test_ucp_tag {
14 public:
test_ucp_tag_probe()15     test_ucp_tag_probe() {
16         if (has_transport("tcp")) {
17             /* Decrease `TX_SEG_SIZE` and `RX_SEG_SIZE` parameters
18              * for TCP transport to be able fully consume receive
19              * buffer by 100-byte messages */
20             m_env.push_back(new ucs::scoped_setenv("UCX_TCP_TX_SEG_SIZE", "4k"));
21             m_env.push_back(new ucs::scoped_setenv("UCX_TCP_RX_SEG_SIZE", "4k"));
22         }
23     }
24 
25     /* The parameters mean the following:
26      *  - s_size and r_size: send and recv buffer sizes.
27      *    Can be different for checking message transaction error
28      *  - is_sync: specifies the type of send function to be used
29      *    (sync or not)
30      *  - is_recv_msg: specifies whether probe function needs to remove
31      *    matched message. If yes, then ucp_tag_msg_recv_nb is used for
32      *    receive
33      * */
test_send_probe(size_t s_size,size_t r_size,bool is_sync,int is_recv_msg)34     void test_send_probe (size_t s_size, size_t r_size, bool is_sync,
35                           int is_recv_msg) {
36         ucp_tag_recv_info_t info;
37         ucp_tag_message_h   message;
38         request             *send_req = NULL;
39         request             *recv_req = NULL;
40 
41         std::vector<char> sendbuf(s_size, 0);
42         std::vector<char> recvbuf(r_size, 0);
43 
44         ucs::fill_random(sendbuf);
45 
46         message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff,
47                                    is_recv_msg, &info);
48         EXPECT_TRUE(message == NULL);
49 
50         if (is_sync) {
51             send_req = send_sync_nb(&sendbuf[0], sendbuf.size(), DATATYPE,
52                                     0x111337);
53 
54         } else {
55             send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
56         }
57 
58         do {
59             progress();
60             message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff,
61                                        is_recv_msg, &info);
62         } while (message == NULL);
63 
64         EXPECT_EQ(sendbuf.size(),      info.length);
65         EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
66 
67         if (is_recv_msg == 0) {
68             recv_req = recv_nb(&recvbuf[0], recvbuf.size(), DATATYPE,
69                                0x1337, 0xffff);
70         } else {
71             recv_req = (request*)ucp_tag_msg_recv_nb(receiver().worker(),
72                                                      &recvbuf[0],recvbuf.size(),
73                                                      DATATYPE, message, recv_callback);
74             ASSERT_TRUE(!UCS_PTR_IS_ERR(recv_req));
75         }
76 
77         wait(recv_req);
78         EXPECT_TRUE(recv_req->completed);
79         if (s_size != r_size) {
80             /* Test for correct msg transaction handling */
81             EXPECT_EQ(UCS_ERR_MESSAGE_TRUNCATED, recv_req->status);
82         } else {
83             /* Everything should be received correctly */
84             EXPECT_EQ(UCS_OK,              recv_req->status);
85             EXPECT_EQ(sendbuf.size(),      recv_req->info.length);
86             EXPECT_EQ((ucp_tag_t)0x111337, recv_req->info.sender_tag);
87             EXPECT_EQ(sendbuf, recvbuf);
88         }
89         request_release(recv_req);
90 
91         if (UCS_PTR_IS_PTR(send_req)) {
92             wait(send_req);
93             EXPECT_TRUE(send_req->completed);
94             EXPECT_EQ(UCS_OK, send_req->status);
95             request_release(send_req);
96         }
97     }
98 
probe_all(std::string & recvbuf)99     int probe_all(std::string &recvbuf)
100     {
101         ucp_tag_message_h   message;
102         ucp_tag_recv_info_t info;
103         request             *req;
104 
105         int count = 0;
106         for (;;) {
107              message = ucp_tag_probe_nb(receiver().worker(), 0, 0, 1, &info);
108              if (message == NULL) {
109                  return count;
110              }
111 
112              req = (request*)ucp_tag_msg_recv_nb(receiver().worker(),
113                                                  &recvbuf[0], recvbuf.size(),
114                                                  DATATYPE, message, recv_callback);
115              wait(req);
116              request_release(req);
117              ++count;
118         }
119     }
120 };
121 
122 
UCS_TEST_P(test_ucp_tag_probe,send_probe)123 UCS_TEST_P(test_ucp_tag_probe, send_probe) {
124     test_send_probe (8, 8, false, 0);
125     test_send_probe (8, 8, true,  0);
126 }
127 
128 UCS_TEST_P(test_ucp_tag_probe, send_medium_msg_probe, "RNDV_THRESH=1048576") {
129     test_send_probe (50000, 50000, false, 1);
130     test_send_probe (50000, 50000, true,  1);
131 }
132 
133 UCS_TEST_P(test_ucp_tag_probe, send_medium_msg_probe_truncated, "RNDV_THRESH=1048576") {
134     test_send_probe (50000, 0, false, 1);
135     test_send_probe (50000, 0, true,  1);
136 }
137 
138 UCS_TEST_P(test_ucp_tag_probe, send_rndv_msg_probe, "RNDV_THRESH=1048576") {
139     static const size_t size = 1148576;
140     ucp_tag_recv_info_t info;
141     ucp_tag_message_h   message;
142     request             *my_send_req, *my_recv_req;
143 
144     skip_loopback();
145 
146     std::vector<char> sendbuf(size, 0);
147     std::vector<char> recvbuf(size, 0);
148 
149     ucs::fill_random(sendbuf);
150 
151     message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info);
152     EXPECT_TRUE(message == NULL);
153 
154     /* sender - send the RTS */
155     my_send_req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
156     ASSERT_TRUE(!UCS_PTR_IS_ERR(my_send_req));
157 
158     /* receiver - get the RTS and put it into unexpected */
159     wait_for_unexpected_msg(receiver().worker(), 10.0);
160 
161     /* receiver - match the rts, remove it from unexpected and return it */
162     message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info);
163     /* make sure that there was a match (RTS) */
164     ASSERT_TRUE(message != NULL);
165     EXPECT_EQ(sendbuf.size(),      info.length);
166     EXPECT_EQ((ucp_tag_t)0x111337, info.sender_tag);
167 
168     /* receiver - process the rts and schedule a get operation */
169     my_recv_req = (request*)ucp_tag_msg_recv_nb(receiver().worker(), &recvbuf[0],
170                                                 recvbuf.size(), DATATYPE, message,
171                                                 recv_callback);
172     ASSERT_TRUE(!UCS_PTR_IS_ERR(my_recv_req));
173 
174     /* receiver - perform rndv get and send the ATS */
175     wait(my_recv_req);
176     EXPECT_TRUE(my_recv_req->completed);
177 
178     /* sender - get the ATS and set send request to completed */
179     short_progress_loop();
180 
181     EXPECT_EQ(UCS_OK,              my_recv_req->status);
182     EXPECT_EQ(sendbuf.size(),      my_recv_req->info.length);
183     EXPECT_EQ((ucp_tag_t)0x111337, my_recv_req->info.sender_tag);
184     EXPECT_EQ(sendbuf, recvbuf);
185 
186     wait_and_validate(my_send_req);
187     request_release(my_recv_req);
188 }
189 
190 UCS_TEST_P(test_ucp_tag_probe, send_2_msg_probe, "RNDV_THRESH=inf") {
191     const ucp_datatype_t DT_INT = ucp_dt_make_contig(sizeof(int));
192     const ucp_tag_t      TAG    = 0xaaa;
193     const size_t         COUNT  = 20000;
194     std::vector<request*> reqs;
195 
196     /*
197      * send in order: 1, 2
198      */
199     std::vector<int> sdata1(COUNT, 1);
200     std::vector<int> sdata2(COUNT, 2);
201     request *sreq1 = send_nb(&sdata1[0], COUNT, DT_INT, TAG);
202     if (sreq1 != NULL) {
203         reqs.push_back(sreq1);
204     }
205     request *sreq2 = send_nb(&sdata2[0], COUNT, DT_INT, TAG);
206     if (sreq2 != NULL) {
207         reqs.push_back(sreq2);
208     }
209 
210     /*
211      * probe in order: 1, 2
212      */
213     ucp_tag_message_h   message1, message2;
214     ucp_tag_recv_info_t info;
215     do {
216         progress();
217         message1 = ucp_tag_probe_nb(receiver().worker(), TAG, 0xffff, 1, &info);
218     } while (message1 == NULL);
219     do {
220         progress();
221         message2 = ucp_tag_probe_nb(receiver().worker(), TAG, 0xffff, 1, &info);
222     } while (message2 == NULL);
223 
224     /*
225      * receive in **reverse** order: 2, 1
226      */
227     std::vector<int> rdata2(COUNT);
228     request *rreq2 = (request*)ucp_tag_msg_recv_nb(receiver().worker(), &rdata2[0],
229                                                    COUNT, DT_INT, message2,
230                                                    recv_callback);
231     reqs.push_back(rreq2);
232     ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq2));
233     wait(rreq2);
234 
235     std::vector<int> rdata1(COUNT);
236     request *rreq1 = (request*)ucp_tag_msg_recv_nb(receiver().worker(), &rdata1[0],
237                                                    COUNT, DT_INT, message1,
238                                                    recv_callback);
239     reqs.push_back(rreq1);
240     ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq1));
241     wait(rreq1);
242 
243     if (sreq1 != NULL) {
244         wait(sreq1);
245     }
246     if (sreq2 != NULL) {
247         wait(sreq2);
248     }
249 
250     /*
251      * expect data to arrive in probe order (rather than recv order)
252      */
253     EXPECT_EQ(sdata1, rdata1);
254     EXPECT_EQ(sdata2, rdata2);
255     while (!reqs.empty()) {
256         request *req = reqs.back();
257         EXPECT_TRUE(req->completed);
258         EXPECT_EQ(UCS_OK, req->status);
259         request_release(req);
260         reqs.pop_back();
261     }
262 }
263 
UCS_TEST_P(test_ucp_tag_probe,limited_probe_size)264 UCS_TEST_P(test_ucp_tag_probe, limited_probe_size) {
265     static const int COUNT = 1000;
266     std::string sendbuf, recvbuf;
267     std::vector<request*> reqs;
268     ucp_tag_recv_info_t info;
269     request *req;
270     int recvd;
271 
272     skip_loopback();
273 
274     sendbuf.resize(100, '1');
275     recvbuf.resize(100, '0');
276 
277     send_b(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
278     recv_b(&recvbuf[0], recvbuf.size(), DATATYPE, 0x111337, 0xffffff, &info);
279 
280     /* send 1000 messages without calling progress */
281     for (int i = 0; i < COUNT; ++i) {
282         req = send_nb(&sendbuf[0], sendbuf.size(), DATATYPE, 0x111337);
283         if (req != NULL) {
284             reqs.push_back(req);
285         }
286 
287         sender().progress(); /* progress only the sender */
288     }
289 
290     for (int i = 0; i < 1000; ++i) {
291         ucs::safe_usleep(1000);
292         sender().progress();
293     }
294 
295     /* progress once */
296     ucp_worker_progress(receiver().worker());
297 
298     /* probe should not have too many messages here because we poll once */
299     recvd = probe_all(recvbuf);
300     EXPECT_LE(recvd, 128);
301 
302     /* receive all the rest */
303     while (recvd < COUNT) {
304         progress();
305         recvd += probe_all(recvbuf);
306     }
307 
308     while (!reqs.empty()) {
309         wait(reqs.back());
310         request_release(reqs.back());
311         reqs.pop_back();
312     }
313 }
314 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_probe)
315