1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
3 * Copyright (c) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 * Copyright (C) Los Alamos National Security, LLC. 2018. ALL RIGHTS RESERVED.
5 *
6 */
7 #include <list>
8 #include <numeric>
9 #include <set>
10 #include <vector>
11 #include <math.h>
12
13 #include "ucp_datatype.h"
14 #include "ucp_test.h"
15
16 #define NUM_MESSAGES 17
17
18 #define UCP_REALLOC_ID 1000
19 #define UCP_SEND_ID 0
20 #define UCP_REPLY_ID 1
21 #define UCP_RELEASE 1
22
23 class test_ucp_am_base : public ucp_test {
24 public:
25 int sent_ams;
26 int replies;
27 int recv_ams;
28 void *reply;
29 void *for_release[NUM_MESSAGES];
30 int release;
31
get_ctx_params()32 static ucp_params_t get_ctx_params() {
33 ucp_params_t params = ucp_test::get_ctx_params();
34 params.field_mask |= UCP_PARAM_FIELD_FEATURES;
35 params.features = UCP_FEATURE_AM;
36 return params;
37 }
38
39 static ucs_status_t ucp_process_am_cb(void *arg, void *data,
40 size_t length,
41 ucp_ep_h reply_ep,
42 unsigned flags);
43
44 static ucs_status_t ucp_process_reply_cb(void *arg, void *data,
45 size_t length,
46 ucp_ep_h reply_ep,
47 unsigned flags);
48
49 ucs_status_t am_handler(test_ucp_am_base *me, void *data,
50 size_t length, unsigned flags);
51 };
52
ucp_process_reply_cb(void * arg,void * data,size_t length,ucp_ep_h reply_ep,unsigned flags)53 ucs_status_t test_ucp_am_base::ucp_process_reply_cb(void *arg, void *data,
54 size_t length,
55 ucp_ep_h reply_ep,
56 unsigned flags)
57 {
58 test_ucp_am_base *self = reinterpret_cast<test_ucp_am_base*>(arg);
59 self->replies++;
60 return UCS_OK;
61 }
62
ucp_process_am_cb(void * arg,void * data,size_t length,ucp_ep_h reply_ep,unsigned flags)63 ucs_status_t test_ucp_am_base::ucp_process_am_cb(void *arg, void *data,
64 size_t length,
65 ucp_ep_h reply_ep,
66 unsigned flags)
67 {
68 test_ucp_am_base *self = reinterpret_cast<test_ucp_am_base*>(arg);
69
70 if (reply_ep) {
71 self->reply = ucp_am_send_nb(reply_ep, UCP_REPLY_ID, NULL, 1,
72 ucp_dt_make_contig(0),
73 (ucp_send_callback_t) ucs_empty_function,
74 0);
75 EXPECT_FALSE(UCS_PTR_IS_ERR(self->reply));
76 }
77
78 return self->am_handler(self, data, length, flags);
79 }
80
am_handler(test_ucp_am_base * me,void * data,size_t length,unsigned flags)81 ucs_status_t test_ucp_am_base::am_handler(test_ucp_am_base *me, void *data,
82 size_t length, unsigned flags)
83 {
84 ucs_status_t status;
85 std::vector<char> cmp(length, (char)length);
86 std::vector<char> databuf(length, 'r');
87
88 memcpy(&databuf[0], data, length);
89
90 EXPECT_EQ(cmp, databuf);
91
92 bool has_desc = flags & UCP_CB_PARAM_FLAG_DATA;
93 if (me->release) {
94 me->for_release[me->recv_ams] = has_desc ? data : NULL;
95 status = has_desc ? UCS_INPROGRESS : UCS_OK;
96 } else {
97 status = UCS_OK;
98 }
99
100 me->recv_ams++;
101 return status;
102 }
103
104 class test_ucp_am : public test_ucp_am_base {
105 public:
get_ep_params()106 ucp_ep_params_t get_ep_params() {
107 ucp_ep_params_t params = test_ucp_am_base::get_ep_params();
108 params.field_mask |= UCP_EP_PARAM_FIELD_FLAGS;
109 params.flags |= UCP_EP_PARAMS_FLAGS_NO_LOOPBACK;
110 return params;
111 }
112
init()113 virtual void init() {
114 modify_config("MAX_EAGER_LANES", "2");
115
116 ucp_test::init();
117 sender().connect(&receiver(), get_ep_params());
118 receiver().connect(&sender(), get_ep_params());
119 }
120
121 protected:
122 void do_set_am_handler_realloc_test();
123 void do_send_process_data_test(int test_release, uint16_t am_id,
124 int send_reply);
125 void do_send_process_data_iov_test(size_t size);
126 void set_handlers(uint16_t am_id);
127 void set_reply_handlers();
128 };
129
set_reply_handlers()130 void test_ucp_am::set_reply_handlers()
131 {
132 ucp_worker_set_am_handler(sender().worker(), UCP_REPLY_ID,
133 ucp_process_reply_cb, this,
134 UCP_AM_FLAG_WHOLE_MSG);
135 ucp_worker_set_am_handler(receiver().worker(), UCP_REPLY_ID,
136 ucp_process_reply_cb, this,
137 UCP_AM_FLAG_WHOLE_MSG);
138 }
139
set_handlers(uint16_t am_id)140 void test_ucp_am::set_handlers(uint16_t am_id)
141 {
142 ucp_worker_set_am_handler(sender().worker(), am_id,
143 ucp_process_am_cb, this,
144 UCP_AM_FLAG_WHOLE_MSG);
145 ucp_worker_set_am_handler(receiver().worker(), am_id,
146 ucp_process_am_cb, this,
147 UCP_AM_FLAG_WHOLE_MSG);
148 }
149
do_send_process_data_test(int test_release,uint16_t am_id,int send_reply)150 void test_ucp_am::do_send_process_data_test(int test_release, uint16_t am_id,
151 int send_reply)
152 {
153 size_t buf_size = pow(2, NUM_MESSAGES - 2);
154 ucs_status_ptr_t sstatus = NULL;
155 std::vector<char> buf(buf_size);
156
157 recv_ams = 0;
158 sent_ams = 0;
159 replies = 0;
160 this->release = test_release;
161
162 for (size_t i = 0; i < buf_size + 1; i = i ? (i * 2) : 1) {
163 for (size_t j = 0; j < i; j++) {
164 buf[j] = i;
165 }
166
167 reply = NULL;
168 sstatus = ucp_am_send_nb(sender().ep(), am_id,
169 buf.data(), 1, ucp_dt_make_contig(i),
170 (ucp_send_callback_t) ucs_empty_function,
171 send_reply);
172
173 EXPECT_FALSE(UCS_PTR_IS_ERR(sstatus));
174 wait(sstatus);
175 sent_ams++;
176
177 if (send_reply) {
178 while (sent_ams != replies) {
179 progress();
180 }
181
182 if (reply != NULL) {
183 ucp_request_release(reply);
184 }
185 }
186 }
187
188 while (sent_ams != recv_ams) {
189 progress();
190 }
191
192 if (send_reply) {
193 while (sent_ams != replies) {
194 progress();
195 }
196 }
197
198 if (test_release) {
199 for(int i = 0; i < recv_ams; i++) {
200 if (for_release[i] != NULL) {
201 ucp_am_data_release(receiver().worker(), for_release[i]);
202 }
203 }
204 }
205 }
206
do_send_process_data_iov_test(size_t size)207 void test_ucp_am::do_send_process_data_iov_test(size_t size)
208 {
209 ucs_status_ptr_t sstatus;
210 size_t index;
211 size_t i;
212
213 recv_ams = 0;
214 sent_ams = 0;
215 release = 0;
216
217 const size_t iovcnt = 2;
218 std::vector<char> sendbuf(size * iovcnt, 0);
219
220 ucs::fill_random(sendbuf);
221
222 set_handlers(0);
223
224 for (i = 1; i < size; i *= 2) {
225 for (size_t iov_it = 0; iov_it < iovcnt; iov_it++) {
226 for (index = 0; index < i; index++) {
227 sendbuf[(iov_it * i) + index] = i * 2;
228 }
229 }
230
231 ucp::data_type_desc_t send_dt_desc(DATATYPE_IOV, sendbuf.data(),
232 i * iovcnt, iovcnt);
233
234 sstatus = ucp_am_send_nb(sender().ep(), 0,
235 send_dt_desc.buf(), iovcnt, DATATYPE_IOV,
236 (ucp_send_callback_t) ucs_empty_function, 0);
237 wait(sstatus);
238 EXPECT_FALSE(UCS_PTR_IS_ERR(sstatus));
239 sent_ams++;
240 }
241
242 while (sent_ams != recv_ams) {
243 progress();
244 }
245 }
246
do_set_am_handler_realloc_test()247 void test_ucp_am::do_set_am_handler_realloc_test()
248 {
249 set_handlers(UCP_SEND_ID);
250 do_send_process_data_test(0, UCP_SEND_ID, 0);
251
252 set_handlers(UCP_REALLOC_ID);
253 do_send_process_data_test(0, UCP_REALLOC_ID, 0);
254
255 set_handlers(UCP_SEND_ID + 1);
256 do_send_process_data_test(0, UCP_SEND_ID + 1, 0);
257 }
258
UCS_TEST_P(test_ucp_am,send_process_am)259 UCS_TEST_P(test_ucp_am, send_process_am)
260 {
261 set_handlers(UCP_SEND_ID);
262 do_send_process_data_test(0, UCP_SEND_ID, 0);
263
264 set_reply_handlers();
265 do_send_process_data_test(0, UCP_SEND_ID, UCP_AM_SEND_REPLY);
266 }
267
UCS_TEST_P(test_ucp_am,send_process_am_release)268 UCS_TEST_P(test_ucp_am, send_process_am_release)
269 {
270 set_handlers(UCP_SEND_ID);
271 do_send_process_data_test(UCP_RELEASE, 0, 0);
272 }
273
UCS_TEST_P(test_ucp_am,send_process_iov_am)274 UCS_TEST_P(test_ucp_am, send_process_iov_am)
275 {
276 ucs::detail::message_stream ms("INFO");
277
278 for (unsigned i = 1; i <= 7; ++i) {
279 size_t max = (long)pow(10.0, i);
280 long count = ucs_max((long)(5000.0 / sqrt(max) /
281 ucs::test_time_multiplier()), 3);
282 ms << count << "x10^" << i << " " << std::flush;
283 for (long j = 0; j < count; ++j) {
284 size_t size = ucs::rand() % max + 1;
285 do_send_process_data_iov_test(size);
286 }
287 }
288 }
289
UCS_TEST_P(test_ucp_am,set_am_handler_realloc)290 UCS_TEST_P(test_ucp_am, set_am_handler_realloc)
291 {
292 do_set_am_handler_realloc_test();
293 }
294
295 UCP_INSTANTIATE_TEST_CASE(test_ucp_am)
296