1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
3  * Copyright (c) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4  * Copyright (C) Los Alamos National Security, LLC. 2018. ALL RIGHTS RESERVED.
5  *
6  */
7 #include <list>
8 #include <numeric>
9 #include <set>
10 #include <vector>
11 #include <math.h>
12 
13 #include "ucp_datatype.h"
14 #include "ucp_test.h"
15 
16 #define NUM_MESSAGES 17
17 
18 #define UCP_REALLOC_ID 1000
19 #define UCP_SEND_ID 0
20 #define UCP_REPLY_ID 1
21 #define UCP_RELEASE 1
22 
23 class test_ucp_am_base : public ucp_test {
24 public:
25     int sent_ams;
26     int replies;
27     int recv_ams;
28     void *reply;
29     void *for_release[NUM_MESSAGES];
30     int release;
31 
get_ctx_params()32     static ucp_params_t get_ctx_params() {
33         ucp_params_t params = ucp_test::get_ctx_params();
34         params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
35         params.features     = UCP_FEATURE_AM;
36         return params;
37     }
38 
39     static ucs_status_t ucp_process_am_cb(void *arg, void *data,
40                                           size_t length,
41                                           ucp_ep_h reply_ep,
42                                           unsigned flags);
43 
44     static ucs_status_t ucp_process_reply_cb(void *arg, void *data,
45                                              size_t length,
46                                              ucp_ep_h reply_ep,
47                                              unsigned flags);
48 
49     ucs_status_t am_handler(test_ucp_am_base *me, void *data,
50                             size_t  length, unsigned flags);
51 };
52 
ucp_process_reply_cb(void * arg,void * data,size_t length,ucp_ep_h reply_ep,unsigned flags)53 ucs_status_t test_ucp_am_base::ucp_process_reply_cb(void *arg, void *data,
54                                                     size_t length,
55                                                     ucp_ep_h reply_ep,
56                                                     unsigned flags)
57 {
58     test_ucp_am_base *self = reinterpret_cast<test_ucp_am_base*>(arg);
59     self->replies++;
60     return UCS_OK;
61 }
62 
ucp_process_am_cb(void * arg,void * data,size_t length,ucp_ep_h reply_ep,unsigned flags)63 ucs_status_t test_ucp_am_base::ucp_process_am_cb(void *arg, void *data,
64                                                  size_t length,
65                                                  ucp_ep_h reply_ep,
66                                                  unsigned flags)
67 {
68     test_ucp_am_base *self = reinterpret_cast<test_ucp_am_base*>(arg);
69 
70     if (reply_ep) {
71         self->reply = ucp_am_send_nb(reply_ep, UCP_REPLY_ID, NULL, 1,
72                                      ucp_dt_make_contig(0),
73                                      (ucp_send_callback_t) ucs_empty_function,
74                                      0);
75         EXPECT_FALSE(UCS_PTR_IS_ERR(self->reply));
76     }
77 
78     return self->am_handler(self, data, length, flags);
79 }
80 
am_handler(test_ucp_am_base * me,void * data,size_t length,unsigned flags)81 ucs_status_t test_ucp_am_base::am_handler(test_ucp_am_base *me, void *data,
82                                           size_t length, unsigned flags)
83 {
84     ucs_status_t status;
85     std::vector<char> cmp(length, (char)length);
86     std::vector<char> databuf(length, 'r');
87 
88     memcpy(&databuf[0], data, length);
89 
90     EXPECT_EQ(cmp, databuf);
91 
92     bool has_desc = flags & UCP_CB_PARAM_FLAG_DATA;
93     if (me->release) {
94         me->for_release[me->recv_ams] = has_desc ? data : NULL;
95         status                        = has_desc ? UCS_INPROGRESS : UCS_OK;
96     } else {
97         status                        = UCS_OK;
98     }
99 
100     me->recv_ams++;
101     return status;
102 }
103 
104 class test_ucp_am : public test_ucp_am_base {
105 public:
get_ep_params()106     ucp_ep_params_t get_ep_params() {
107         ucp_ep_params_t params = test_ucp_am_base::get_ep_params();
108         params.field_mask     |= UCP_EP_PARAM_FIELD_FLAGS;
109         params.flags          |= UCP_EP_PARAMS_FLAGS_NO_LOOPBACK;
110         return params;
111     }
112 
init()113     virtual void init() {
114         modify_config("MAX_EAGER_LANES", "2");
115 
116         ucp_test::init();
117         sender().connect(&receiver(), get_ep_params());
118         receiver().connect(&sender(), get_ep_params());
119     }
120 
121 protected:
122     void do_set_am_handler_realloc_test();
123     void do_send_process_data_test(int test_release, uint16_t am_id,
124                                    int send_reply);
125     void do_send_process_data_iov_test(size_t size);
126     void set_handlers(uint16_t am_id);
127     void set_reply_handlers();
128 };
129 
set_reply_handlers()130 void test_ucp_am::set_reply_handlers()
131 {
132     ucp_worker_set_am_handler(sender().worker(), UCP_REPLY_ID,
133                               ucp_process_reply_cb, this,
134                               UCP_AM_FLAG_WHOLE_MSG);
135     ucp_worker_set_am_handler(receiver().worker(), UCP_REPLY_ID,
136                               ucp_process_reply_cb, this,
137                               UCP_AM_FLAG_WHOLE_MSG);
138 }
139 
set_handlers(uint16_t am_id)140 void test_ucp_am::set_handlers(uint16_t am_id)
141 {
142     ucp_worker_set_am_handler(sender().worker(), am_id,
143                               ucp_process_am_cb, this,
144                               UCP_AM_FLAG_WHOLE_MSG);
145     ucp_worker_set_am_handler(receiver().worker(), am_id,
146                               ucp_process_am_cb, this,
147                               UCP_AM_FLAG_WHOLE_MSG);
148 }
149 
do_send_process_data_test(int test_release,uint16_t am_id,int send_reply)150 void test_ucp_am::do_send_process_data_test(int test_release, uint16_t am_id,
151                                             int send_reply)
152 {
153     size_t buf_size          = pow(2, NUM_MESSAGES - 2);
154     ucs_status_ptr_t sstatus = NULL;
155     std::vector<char> buf(buf_size);
156 
157     recv_ams      = 0;
158     sent_ams      = 0;
159     replies       = 0;
160     this->release = test_release;
161 
162     for (size_t i = 0; i < buf_size + 1; i = i ? (i * 2) : 1) {
163         for (size_t j = 0; j < i; j++) {
164             buf[j] = i;
165         }
166 
167         reply   = NULL;
168         sstatus = ucp_am_send_nb(sender().ep(), am_id,
169                                  buf.data(), 1, ucp_dt_make_contig(i),
170                                  (ucp_send_callback_t) ucs_empty_function,
171                                  send_reply);
172 
173         EXPECT_FALSE(UCS_PTR_IS_ERR(sstatus));
174         wait(sstatus);
175         sent_ams++;
176 
177         if (send_reply) {
178             while (sent_ams != replies) {
179                 progress();
180             }
181 
182             if (reply != NULL) {
183                 ucp_request_release(reply);
184             }
185         }
186     }
187 
188     while (sent_ams != recv_ams) {
189         progress();
190     }
191 
192     if (send_reply) {
193         while (sent_ams != replies) {
194             progress();
195         }
196     }
197 
198     if (test_release) {
199         for(int i = 0; i < recv_ams; i++) {
200             if (for_release[i] != NULL) {
201                 ucp_am_data_release(receiver().worker(), for_release[i]);
202             }
203         }
204     }
205 }
206 
do_send_process_data_iov_test(size_t size)207 void test_ucp_am::do_send_process_data_iov_test(size_t size)
208 {
209     ucs_status_ptr_t sstatus;
210     size_t index;
211     size_t i;
212 
213     recv_ams = 0;
214     sent_ams = 0;
215     release  = 0;
216 
217     const size_t iovcnt = 2;
218     std::vector<char> sendbuf(size * iovcnt, 0);
219 
220     ucs::fill_random(sendbuf);
221 
222     set_handlers(0);
223 
224     for (i = 1; i < size; i *= 2) {
225         for (size_t iov_it = 0; iov_it < iovcnt; iov_it++) {
226             for (index = 0; index < i; index++) {
227                 sendbuf[(iov_it * i) + index] = i * 2;
228             }
229         }
230 
231         ucp::data_type_desc_t send_dt_desc(DATATYPE_IOV, sendbuf.data(),
232                                            i * iovcnt, iovcnt);
233 
234         sstatus = ucp_am_send_nb(sender().ep(), 0,
235                                  send_dt_desc.buf(), iovcnt, DATATYPE_IOV,
236                                  (ucp_send_callback_t) ucs_empty_function, 0);
237         wait(sstatus);
238         EXPECT_FALSE(UCS_PTR_IS_ERR(sstatus));
239         sent_ams++;
240     }
241 
242     while (sent_ams != recv_ams) {
243         progress();
244     }
245 }
246 
do_set_am_handler_realloc_test()247 void test_ucp_am::do_set_am_handler_realloc_test()
248 {
249     set_handlers(UCP_SEND_ID);
250     do_send_process_data_test(0, UCP_SEND_ID, 0);
251 
252     set_handlers(UCP_REALLOC_ID);
253     do_send_process_data_test(0, UCP_REALLOC_ID, 0);
254 
255     set_handlers(UCP_SEND_ID + 1);
256     do_send_process_data_test(0, UCP_SEND_ID + 1, 0);
257 }
258 
UCS_TEST_P(test_ucp_am,send_process_am)259 UCS_TEST_P(test_ucp_am, send_process_am)
260 {
261     set_handlers(UCP_SEND_ID);
262     do_send_process_data_test(0, UCP_SEND_ID, 0);
263 
264     set_reply_handlers();
265     do_send_process_data_test(0, UCP_SEND_ID, UCP_AM_SEND_REPLY);
266 }
267 
UCS_TEST_P(test_ucp_am,send_process_am_release)268 UCS_TEST_P(test_ucp_am, send_process_am_release)
269 {
270     set_handlers(UCP_SEND_ID);
271     do_send_process_data_test(UCP_RELEASE, 0, 0);
272 }
273 
UCS_TEST_P(test_ucp_am,send_process_iov_am)274 UCS_TEST_P(test_ucp_am, send_process_iov_am)
275 {
276     ucs::detail::message_stream ms("INFO");
277 
278     for (unsigned i = 1; i <= 7; ++i) {
279         size_t max = (long)pow(10.0, i);
280         long count = ucs_max((long)(5000.0 / sqrt(max) /
281                                     ucs::test_time_multiplier()), 3);
282         ms << count << "x10^" << i << " " << std::flush;
283         for (long j = 0; j < count; ++j) {
284             size_t size = ucs::rand() % max + 1;
285             do_send_process_data_iov_test(size);
286         }
287     }
288 }
289 
UCS_TEST_P(test_ucp_am,set_am_handler_realloc)290 UCS_TEST_P(test_ucp_am, set_am_handler_realloc)
291 {
292     do_set_am_handler_realloc_test();
293 }
294 
295 UCP_INSTANTIATE_TEST_CASE(test_ucp_am)
296