1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 extern "C" {
8 #include <uct/api/uct.h>
9 }
10 #include <common/test.h>
11 #include "uct_test.h"
12
13 #define UCT_TAG_INSTANTIATE_TEST_CASE(_test_case) \
14 _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_mlx5) \
15 _UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5)
16
17 class test_tag : public uct_test {
18 public:
19 static const uint64_t SEND_SEED = 0xa1a1a1a1a1a1a1a1ul;
20 static const uint64_t RECV_SEED = 0xb2b2b2b2b2b2b2b2ul;
21 static const uint64_t MASK = 0xfffffffffffffffful;
22
23 struct rndv_hdr {
24 uint64_t priv[2];
25 uint16_t tail;
26 } UCS_S_PACKED;
27
28 struct recv_ctx {
29 mapped_buffer *mbuf;
30 uct_tag_t tag;
31 uct_tag_t tmask;
32 bool take_uct_desc;
33 bool comp;
34 bool unexp;
35 bool consumed;
36 bool sw_rndv;
37 uct_tag_context_t uct_ctx;
38 ucs_status_t status;
39 };
40
41 struct send_ctx {
42 mapped_buffer *mbuf;
43 void *rndv_op;
44 uct_tag_t tag;
45 uint64_t imm_data;
46 uct_completion_t uct_comp;
47 ucs_status_t status;
48 bool sw_rndv;
49 bool comp;
50 bool unexp;
51 };
52
53 typedef ucs_status_t (test_tag::*send_func)(entity&, send_ctx&);
54
init()55 void init()
56 {
57 ucs_status_t status = uct_config_modify(m_iface_config,
58 "RC_TM_ENABLE", "y");
59 ASSERT_TRUE((status == UCS_OK) || (status == UCS_ERR_NO_ELEM));
60
61 status = uct_config_modify(m_iface_config, "RC_TM_MP_SRQ_ENABLE", "no");
62 ASSERT_TRUE((status == UCS_OK) || (status == UCS_ERR_NO_ELEM));
63
64 uct_test::init();
65
66 entity *sender = uct_test::create_entity(0ul, NULL, unexp_eager,
67 unexp_rndv,
68 reinterpret_cast<void*>(this),
69 reinterpret_cast<void*>(this));
70 m_entities.push_back(sender);
71
72 check_skip_test();
73
74 if (UCT_DEVICE_TYPE_SELF == GetParam()->dev_type) {
75 sender->connect(0, *sender, 0);
76 } else {
77 entity *receiver = uct_test::create_entity(0ul, NULL, unexp_eager,
78 unexp_rndv,
79 reinterpret_cast<void*>(this),
80 reinterpret_cast<void*>(this));
81 m_entities.push_back(receiver);
82
83 sender->connect(0, *receiver, 0);
84 }
85 }
86
init_send_ctx(send_ctx & s,mapped_buffer * b,uct_tag_t t,uint64_t i,bool unexp_flow=true)87 void init_send_ctx(send_ctx &s,mapped_buffer *b, uct_tag_t t, uint64_t i,
88 bool unexp_flow = true)
89 {
90 s.mbuf = b;
91 s.rndv_op = NULL;
92 s.tag = t;
93 s.imm_data = i;
94 s.uct_comp.count = 1;
95 s.uct_comp.func = send_completion;
96 s.sw_rndv = s.comp = false;
97 s.unexp = unexp_flow;
98 s.status = UCS_ERR_NO_PROGRESS;
99 }
100
init_recv_ctx(recv_ctx & r,mapped_buffer * b,uct_tag_t t,uct_tag_t m=MASK,bool uct_d=false)101 void init_recv_ctx(recv_ctx &r, mapped_buffer *b, uct_tag_t t,
102 uct_tag_t m = MASK, bool uct_d = false)
103 {
104 r.mbuf = b;
105 r.tag = t;
106 r.tmask = m;
107 r.uct_ctx.completed_cb = completed;
108 r.uct_ctx.tag_consumed_cb = tag_consumed;
109 r.uct_ctx.rndv_cb = sw_rndv_completed;
110 r.take_uct_desc = uct_d;
111 r.status = UCS_ERR_NO_PROGRESS;
112 r.comp = r.unexp = r.consumed = r.sw_rndv = false;
113 }
114
tag_eager_short(entity & e,send_ctx & ctx)115 ucs_status_t tag_eager_short(entity &e, send_ctx &ctx)
116 {
117 ctx.status = uct_ep_tag_eager_short(e.ep(0), ctx.tag, ctx.mbuf->ptr(),
118 ctx.mbuf->length());
119 ctx.comp = true;
120
121 return ctx.status;
122 }
123
tag_eager_bcopy(entity & e,send_ctx & ctx)124 ucs_status_t tag_eager_bcopy(entity &e, send_ctx &ctx)
125 {
126 ssize_t status = uct_ep_tag_eager_bcopy(e.ep(0), ctx.tag,
127 ctx.imm_data, mapped_buffer::pack,
128 reinterpret_cast<void*>(ctx.mbuf),
129 0);
130 ctx.status = (status >= 0) ? UCS_OK : static_cast<ucs_status_t>(status);
131 ctx.comp = true;
132
133 return ctx.status;
134 }
135
tag_eager_zcopy(entity & e,send_ctx & ctx)136 ucs_status_t tag_eager_zcopy(entity &e, send_ctx &ctx)
137 {
138 UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ctx.mbuf->ptr(),
139 ctx.mbuf->length(), ctx.mbuf->memh(),
140 e.iface_attr().cap.tag.eager.max_iov);
141
142 ucs_status_t status = uct_ep_tag_eager_zcopy(e.ep(0), ctx.tag,
143 ctx.imm_data, iov, iovcnt,
144 0, &ctx.uct_comp);
145 if (status == UCS_INPROGRESS) {
146 status = UCS_OK;
147 }
148 return status;
149 }
150
tag_rndv_zcopy(entity & e,send_ctx & ctx)151 ucs_status_t tag_rndv_zcopy(entity &e, send_ctx &ctx)
152 {
153 rndv_hdr hdr = {{ctx.imm_data,
154 reinterpret_cast<uint64_t>(&ctx)
155 },
156 0xFAFA
157 };
158
159 UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ctx.mbuf->ptr(),
160 ctx.mbuf->length(), ctx.mbuf->memh(), 1);
161
162 ctx.rndv_op = uct_ep_tag_rndv_zcopy(e.ep(0), ctx.tag, &hdr,
163 sizeof(hdr), iov, iovcnt, 0,
164 &ctx.uct_comp);
165
166 return (UCS_PTR_IS_ERR(ctx.rndv_op)) ? UCS_PTR_STATUS(ctx.rndv_op) :
167 UCS_OK;
168 }
169
tag_rndv_cancel(entity & e,void * op)170 ucs_status_t tag_rndv_cancel(entity &e, void *op)
171 {
172 return uct_ep_tag_rndv_cancel(e.ep(0), op);
173 }
174
tag_rndv_request(entity & e,send_ctx & ctx)175 ucs_status_t tag_rndv_request(entity &e, send_ctx &ctx)
176 {
177 ctx.sw_rndv = true;
178
179 if (ctx.unexp) {
180 // Unexpected flow, will need to analyze ctx data on the receiver
181 rndv_hdr hdr = {{ctx.imm_data,
182 reinterpret_cast<uint64_t>(&ctx)
183 },
184 0xFAFA
185 };
186 ctx.status = uct_ep_tag_rndv_request(e.ep(0), ctx.tag, &hdr,
187 sizeof(hdr), 0);
188 } else {
189 // Expected flow, send just plain data (will be stored in rx buf by HCA)
190 ctx.status = uct_ep_tag_rndv_request(e.ep(0), ctx.tag, ctx.mbuf->ptr(),
191 ctx.mbuf->length(), 0);
192 }
193 ctx.comp = true;
194
195 return ctx.status;
196 }
197
tag_post(entity & e,recv_ctx & ctx)198 ucs_status_t tag_post(entity &e, recv_ctx &ctx)
199 {
200 UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ctx.mbuf->ptr(),
201 ctx.mbuf->length(), ctx.mbuf->memh(), 1);
202 return uct_iface_tag_recv_zcopy(e.iface(), ctx.tag, ctx.tmask,
203 iov, iovcnt, &ctx.uct_ctx);
204 }
205
tag_cancel(entity & e,recv_ctx & ctx,int force)206 ucs_status_t tag_cancel(entity &e, recv_ctx &ctx, int force)
207 {
208 return uct_iface_tag_recv_cancel(e.iface(), &ctx.uct_ctx, force);
209 }
210
211
212 // If expected message arrives, two callbacks should be called:
213 // tag_consumed and completed (unexpected callback should not be
214 // called). And it is vice versa if message arrives unexpectedly.
215 // If expected SW RNDV request arrives tag_consumed and sw_rndv_cb
216 // should be called.
check_rx_completion(recv_ctx & ctx,bool is_expected,uint64_t seed,ucs_status_t status=UCS_OK,bool is_sw_rndv=false)217 void check_rx_completion(recv_ctx &ctx, bool is_expected, uint64_t seed,
218 ucs_status_t status = UCS_OK, bool is_sw_rndv = false)
219 {
220 EXPECT_EQ(ctx.consumed, is_expected);
221 EXPECT_EQ(ctx.comp, (is_expected && !is_sw_rndv));
222 EXPECT_EQ(ctx.unexp, (!is_expected && !is_sw_rndv));
223 EXPECT_EQ(ctx.sw_rndv, is_sw_rndv);
224 EXPECT_EQ(ctx.status, status);
225 if (is_expected) {
226 ctx.mbuf->pattern_check(seed);
227 }
228 }
229
check_tx_completion(send_ctx & ctx)230 void check_tx_completion(send_ctx &ctx)
231 {
232 wait_for_flag(&ctx.comp);
233 EXPECT_TRUE(ctx.comp);
234 EXPECT_EQ(ctx.status, UCS_OK);
235 }
236
test_tag_expected(send_func sfunc,size_t length=75,bool is_sw_rndv=false)237 void test_tag_expected(send_func sfunc, size_t length = 75,
238 bool is_sw_rndv = false) {
239 uct_tag_t tag = 11;
240
241 if (RUNNING_ON_VALGRIND) {
242 length = ucs_min(length, 128U);
243 }
244
245 mapped_buffer recvbuf(length, RECV_SEED, receiver());
246 recv_ctx r_ctx;
247 init_recv_ctx(r_ctx, &recvbuf, tag);
248 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
249
250 short_progress_loop();
251
252 mapped_buffer sendbuf(length, SEND_SEED, sender());
253 send_ctx s_ctx;
254 init_send_ctx(s_ctx, &sendbuf, tag, reinterpret_cast<uint64_t>(&r_ctx),
255 false);
256 ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
257
258 // max rndv can be quite big, use increased timeout
259 wait_for_flag(is_sw_rndv ? &r_ctx.sw_rndv : &r_ctx.comp,
260 3 * DEFAULT_TIMEOUT_SEC);
261
262 check_rx_completion(r_ctx, true, SEND_SEED, UCS_OK, is_sw_rndv);
263
264 // If it was RNDV send, need to wait send completion as well
265 check_tx_completion(s_ctx);
266
267 flush();
268 }
269
test_tag_unexpected(send_func sfunc,size_t length=75,bool take_uct_desc=false)270 void test_tag_unexpected(send_func sfunc, size_t length = 75,
271 bool take_uct_desc = false)
272 {
273 uct_tag_t tag = 11;
274
275 if (RUNNING_ON_VALGRIND) {
276 length = ucs_min(length, 128U);
277 }
278
279 mapped_buffer recvbuf(length, RECV_SEED, receiver());
280 mapped_buffer sendbuf(length, SEND_SEED, sender());
281 recv_ctx r_ctx;
282 init_recv_ctx(r_ctx, &recvbuf, tag, MASK, take_uct_desc);
283 send_ctx s_ctx;
284 init_send_ctx(s_ctx, &sendbuf, tag, reinterpret_cast<uint64_t>(&r_ctx));
285 ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
286
287 wait_for_flag(&r_ctx.unexp);
288 if (static_cast<send_func>(&test_tag::tag_rndv_zcopy) == sfunc) {
289 // Need to cancel origin RNDV operation, beacuse no RNDV_COMP
290 // will be received (as it arrived unexpectedly and should be
291 // handled by SW).
292 ASSERT_UCS_OK(tag_rndv_cancel(sender(), s_ctx.rndv_op));
293 }
294
295 check_rx_completion(r_ctx, false, SEND_SEED);
296 flush();
297 }
298
test_tag_wrong_tag(send_func sfunc)299 void test_tag_wrong_tag(send_func sfunc)
300 {
301 const size_t length = 65;
302 uct_tag_t tag = 11;
303
304 mapped_buffer sendbuf(length, SEND_SEED, sender());
305 mapped_buffer recvbuf(length, RECV_SEED, receiver());
306
307 // Post modified tag for incoming message to be reported as unexpected
308 // and not to be macthed.
309 recv_ctx r_ctx;
310 init_recv_ctx(r_ctx, &recvbuf, tag + 1);
311 send_ctx s_ctx;
312 init_send_ctx(s_ctx, &sendbuf, tag, reinterpret_cast<uint64_t>(&r_ctx));
313
314 ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
315
316 short_progress_loop();
317
318 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
319
320 wait_for_flag(&r_ctx.unexp);
321
322 // Message should be reported as unexpected and filled with
323 // recv seed (unchanged), as the incoming tag does not match the expected
324 check_rx_completion(r_ctx, false, RECV_SEED);
325 ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
326 flush();
327 }
328
test_tag_mask(send_func sfunc)329 void test_tag_mask(send_func sfunc)
330 {
331 const size_t length = 65;
332
333 mapped_buffer recvbuf(length, RECV_SEED, receiver());
334
335 // Post tag and tag mask in a way that it matches sender tag with
336 // tag_mask applied, but is not exactly the same.
337 recv_ctx r_ctx;
338 init_recv_ctx(r_ctx, &recvbuf, 0xff, 0xff);
339 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
340
341 short_progress_loop();
342
343 mapped_buffer sendbuf(length, SEND_SEED, sender());
344 send_ctx s_ctx;
345 init_send_ctx(s_ctx, &sendbuf, 0xffff, reinterpret_cast<uint64_t>(&r_ctx));
346 ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
347 wait_for_flag(&r_ctx.comp);
348
349 // Should be matched because tags are equal with tag mask applied.
350 check_rx_completion(r_ctx, true, SEND_SEED);
351
352 // If it was RNDV send, need to wait send completion as well
353 check_tx_completion(s_ctx);
354 flush();
355 }
356
unexpected_handler(recv_ctx * ctx,void * data,unsigned flags)357 ucs_status_t unexpected_handler(recv_ctx *ctx, void *data, unsigned flags)
358 {
359 if (ctx->take_uct_desc && (flags & UCT_CB_PARAM_FLAG_DESC)) {
360 m_uct_descs.push_back(data);
361 return UCS_INPROGRESS;
362 } else {
363 return UCS_OK;
364 }
365 }
366
tag_consumed(uct_tag_context_t * self)367 static void tag_consumed(uct_tag_context_t *self)
368 {
369 recv_ctx *user_ctx = ucs_container_of(self, recv_ctx, uct_ctx);
370 user_ctx->consumed = true;
371 }
372
completed(uct_tag_context_t * self,uct_tag_t stag,uint64_t imm,size_t length,ucs_status_t status)373 static void completed(uct_tag_context_t *self, uct_tag_t stag, uint64_t imm,
374 size_t length, ucs_status_t status)
375 {
376 recv_ctx *user_ctx = ucs_container_of(self, recv_ctx, uct_ctx);
377 user_ctx->comp = true;
378 user_ctx->status = status;
379 EXPECT_EQ(user_ctx->tag, (stag & user_ctx->tmask));
380 EXPECT_EQ(user_ctx->mbuf->length(), length);
381 }
382
sw_rndv_completed(uct_tag_context_t * self,uct_tag_t stag,const void * header,unsigned header_length,ucs_status_t status)383 static void sw_rndv_completed(uct_tag_context_t *self, uct_tag_t stag,
384 const void *header, unsigned header_length,
385 ucs_status_t status)
386 {
387 recv_ctx *user_ctx = ucs_container_of(self, recv_ctx, uct_ctx);
388 user_ctx->sw_rndv = true;
389 user_ctx->status = status;
390 EXPECT_EQ(user_ctx->tag, (stag & user_ctx->tmask));
391 EXPECT_EQ(user_ctx->mbuf->length(), header_length);
392 }
393
unexp_eager(void * arg,void * data,size_t length,unsigned flags,uct_tag_t stag,uint64_t imm,void ** context)394 static ucs_status_t unexp_eager(void *arg, void *data, size_t length,
395 unsigned flags, uct_tag_t stag,
396 uint64_t imm, void **context)
397 {
398 recv_ctx *user_ctx = reinterpret_cast<recv_ctx*>(imm);
399 user_ctx->unexp = true;
400 user_ctx->status = UCS_OK;
401 if (user_ctx->tag == stag) {
402 memcpy(user_ctx->mbuf->ptr(), data, ucs_min(length,
403 user_ctx->mbuf->length()));
404 user_ctx->mbuf->pattern_check(SEND_SEED);
405 }
406
407 test_tag *self = reinterpret_cast<test_tag*>(arg);
408 return self->unexpected_handler(user_ctx, data, flags);
409 }
410
unexp_rndv(void * arg,unsigned flags,uint64_t stag,const void * header,unsigned header_length,uint64_t remote_addr,size_t length,const void * rkey_buf)411 static ucs_status_t unexp_rndv(void *arg, unsigned flags, uint64_t stag,
412 const void *header, unsigned header_length,
413 uint64_t remote_addr, size_t length,
414 const void *rkey_buf)
415 {
416 rndv_hdr *rhdr = const_cast<rndv_hdr*>(static_cast<const rndv_hdr*>(header));
417 recv_ctx *r_ctx = reinterpret_cast<recv_ctx*>(rhdr->priv[0]);
418 send_ctx *s_ctx = reinterpret_cast<send_ctx*>(rhdr->priv[1]);
419 uint16_t tail = rhdr->tail;
420 r_ctx->unexp = true;
421 r_ctx->status = UCS_OK;
422
423 EXPECT_EQ(tail, 0xFAFA);
424 EXPECT_EQ(s_ctx->tag, stag);
425 EXPECT_EQ(length, s_ctx->sw_rndv ? 0 : s_ctx->mbuf->length());
426 EXPECT_EQ(remote_addr, s_ctx->sw_rndv ? 0ul :
427 reinterpret_cast<uint64_t>(s_ctx->mbuf->ptr()));
428
429 test_tag *self = reinterpret_cast<test_tag*>(arg);
430 return self->unexpected_handler(r_ctx, const_cast<void*>(header), flags);
431 }
432
am_handler(void * arg,void * data,size_t length,unsigned flags)433 static ucs_status_t am_handler(void *arg, void *data, size_t length,
434 unsigned flags)
435 {
436 is_am_received = true;
437 return UCS_OK;
438 }
439
440 static ucs_log_func_rc_t
log_ep_destroy(const char * file,unsigned line,const char * function,ucs_log_level_t level,const ucs_log_component_config_t * comp_conf,const char * message,va_list ap)441 log_ep_destroy(const char *file, unsigned line, const char *function,
442 ucs_log_level_t level,
443 const ucs_log_component_config_t *comp_conf,
444 const char *message, va_list ap)
445 {
446 if (level == UCS_LOG_LEVEL_WARN) {
447 // Ignore warnings about uncompleted operations during ep destroy
448 return UCS_LOG_FUNC_RC_STOP;
449 }
450 return UCS_LOG_FUNC_RC_CONTINUE;
451 }
452
send_completion(uct_completion_t * self,ucs_status_t status)453 static void send_completion(uct_completion_t *self, ucs_status_t status)
454 {
455 send_ctx *user_ctx = ucs_container_of(self, send_ctx, uct_comp);
456 user_ctx->comp = true;
457 user_ctx->status = status;
458 }
459
460
461 protected:
sender()462 uct_test::entity& sender() {
463 return **m_entities.begin();
464 }
465
receiver()466 uct_test::entity& receiver() {
467 return **(m_entities.end() - 1);
468 }
469
470 std::vector<void*> m_uct_descs;
471
472 static bool is_am_received;
473 };
474
475 bool test_tag::is_am_received = false;
476
477 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_short_expected,
478 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_SHORT))
479 {
480 test_tag_expected(static_cast<send_func>(&test_tag::tag_eager_short),
481 sender().iface_attr().cap.tag.eager.max_short);
482 }
483
484 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_expected,
485 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
486 {
487 test_tag_expected(static_cast<send_func>(&test_tag::tag_eager_bcopy),
488 sender().iface_attr().cap.tag.eager.max_bcopy);
489 }
490
491 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_expected,
492 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
493 {
494 test_tag_expected(static_cast<send_func>(&test_tag::tag_eager_zcopy),
495 sender().iface_attr().cap.tag.eager.max_zcopy);
496 }
497
498 UCS_TEST_SKIP_COND_P(test_tag, tag_rndv_zcopy_expected,
499 !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
500 {
501 test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_zcopy),
502 sender().iface_attr().cap.tag.rndv.max_zcopy);
503 }
504
505 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_unexpected,
506 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
507 {
508 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_bcopy),
509 sender().iface_attr().cap.tag.eager.max_bcopy);
510 }
511
512 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_unexpected,
513 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
514 {
515 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_zcopy),
516 sender().iface_attr().cap.tag.eager.max_bcopy);
517 }
518
519 UCS_TEST_SKIP_COND_P(test_tag, tag_rndv_zcopy_unexpected,
520 !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
521 {
522 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_zcopy));
523 }
524
525 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_wrong_tag,
526 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
527 {
528 test_tag_wrong_tag(static_cast<send_func>(&test_tag::tag_eager_bcopy));
529 }
530
531 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_wrong_tag,
532 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
533 {
534 test_tag_wrong_tag(static_cast<send_func>(&test_tag::tag_eager_zcopy));
535 }
536
537 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_short_tag_mask,
538 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_SHORT))
539 {
540 test_tag_mask(static_cast<send_func>(&test_tag::tag_eager_short));
541 }
542
543 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_tag_mask,
544 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
545 {
546 test_tag_mask(static_cast<send_func>(&test_tag::tag_eager_bcopy));
547 }
548
549 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_tag_mask,
550 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
551 {
552 test_tag_mask(static_cast<send_func>(&test_tag::tag_eager_zcopy));
553 }
554
555 UCS_TEST_SKIP_COND_P(test_tag, tag_rndv_zcopy_tag_mask,
556 !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
557 {
558 test_tag_mask(static_cast<send_func>(&test_tag::tag_rndv_zcopy));
559 }
560
561 UCS_TEST_SKIP_COND_P(test_tag, tag_hold_uct_desc,
562 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
563 UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
564 {
565 int n = 10;
566 int msg_size = ucs_min(sender().iface_attr().cap.tag.eager.max_bcopy,
567 sender().iface_attr().cap.tag.rndv.max_zcopy);
568 for (int i = 0; i < n; ++i) {
569 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_bcopy),
570 msg_size, true);
571
572 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_zcopy),
573 msg_size, true);
574 }
575
576 for (ucs::ptr_vector<void>::const_iterator iter = m_uct_descs.begin();
577 iter != m_uct_descs.end(); ++iter)
578 {
579 uct_iface_release_desc(*iter);
580 }
581 }
582
583
584 UCS_TEST_SKIP_COND_P(test_tag, tag_send_no_tag,
585 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
586 {
587 uct_iface_set_am_handler(receiver().iface(), 0, am_handler, NULL, 0);
588 mapped_buffer lbuf(200, SEND_SEED, sender());
589 ssize_t len = uct_ep_am_bcopy(sender().ep(0), 0, mapped_buffer::pack,
590 reinterpret_cast<void*>(&lbuf), 0);
591 EXPECT_EQ(lbuf.length(), static_cast<size_t>(len));
592 wait_for_flag(&is_am_received);
593 EXPECT_TRUE(is_am_received);
594 }
595
596 UCS_TEST_SKIP_COND_P(test_tag, tag_cancel_force,
597 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
598 {
599 const size_t length = 128;
600 mapped_buffer recvbuf(length, RECV_SEED, receiver());
601 recv_ctx r_ctx;
602 init_recv_ctx(r_ctx, &recvbuf, 1);
603
604 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
605 short_progress_loop(200);
606 ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
607
608 short_progress_loop();
609
610 mapped_buffer sendbuf(length, SEND_SEED, sender());
611 send_ctx s_ctx;
612 init_send_ctx(s_ctx, &sendbuf, 1, reinterpret_cast<uint64_t>(&r_ctx));
613 ASSERT_UCS_OK(tag_eager_bcopy(sender(), s_ctx));
614
615 // Message should arrive unexpected, since tag was cancelled
616 // on the receiver.
617 wait_for_flag(&r_ctx.unexp);
618 check_rx_completion(r_ctx, false, SEND_SEED);
619 }
620
621 UCS_TEST_SKIP_COND_P(test_tag, tag_cancel_noforce,
622 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
623 {
624 const size_t length = 128;
625 mapped_buffer recvbuf(length, RECV_SEED, receiver());
626 recv_ctx r_ctx;
627 init_recv_ctx(r_ctx, &recvbuf, 1);
628
629 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
630 short_progress_loop(200);
631 ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 0));
632
633 wait_for_flag(&r_ctx.comp);
634
635 // Check that completed callback has been called with CANCELED status
636 // (because 0 was passed as force parameter to cancel).
637 EXPECT_TRUE(r_ctx.comp);
638 EXPECT_EQ(r_ctx.status, UCS_ERR_CANCELED);
639 }
640
641 UCS_TEST_SKIP_COND_P(test_tag, tag_limit,
642 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
643 {
644 const size_t length = 32;
645 ucs::ptr_vector<recv_ctx> rctxs;
646 ucs::ptr_vector<mapped_buffer> rbufs;
647 ucs_status_t status;
648
649 do {
650 recv_ctx *rctx_p = new recv_ctx();
651 mapped_buffer *buf_p = new mapped_buffer(length, RECV_SEED, receiver());
652 init_recv_ctx(*rctx_p, buf_p, 1);
653 rctxs.push_back(rctx_p);
654 rbufs.push_back(buf_p);
655 status = tag_post(receiver(), *rctx_p);
656 // Make sure send resources are acknowledged, as we
657 // awaiting for tag space exhaustion.
658 short_progress_loop();
659 } while (status == UCS_OK);
660
661 EXPECT_EQ(status, UCS_ERR_EXCEEDS_LIMIT);
662
663 // Cancel one of the postings
664 ASSERT_UCS_OK(tag_cancel(receiver(), rctxs.at(0), 1));
665 short_progress_loop();
666
667 // Check we can post again within a reasonable time
668 ucs_time_t deadline = ucs_get_time() + ucs_time_from_sec(20.0);
669 do {
670 status = tag_post(receiver(), rctxs.at(0));
671 } while ((ucs_get_time() < deadline) && (status == UCS_ERR_EXCEEDS_LIMIT));
672 ASSERT_UCS_OK(status);
673
674 // remove posted tags from HW
675 for (ucs::ptr_vector<recv_ctx>::const_iterator iter = rctxs.begin();
676 iter != rctxs.end() - 1; ++iter) {
677 ASSERT_UCS_OK(tag_cancel(receiver(), **iter, 1));
678 }
679 }
680
681 UCS_TEST_SKIP_COND_P(test_tag, tag_post_same,
682 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
683 {
684 const size_t length = 128;
685 mapped_buffer recvbuf(length, RECV_SEED, receiver());
686 recv_ctx r_ctx;
687 init_recv_ctx(r_ctx, &recvbuf, 1);
688
689 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
690
691 // Can't post the same buffer until it is completed/cancelled
692 ucs_status_t status = tag_post(receiver(), r_ctx);
693 EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS);
694
695 // Cancel with force, should be able to re-post immediately
696 ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
697 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
698
699 // Cancel without force, should be able to re-post when receive completion
700 ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 0));
701 status = tag_post(receiver(), r_ctx);
702 EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS); // no completion yet
703
704 wait_for_flag(&r_ctx.comp); // cancel completed, should be able to post
705 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
706
707 // Now send something to trigger rx completion
708 init_recv_ctx(r_ctx, &recvbuf, 1); // reinit rx to clear completed states
709 mapped_buffer sendbuf(length, SEND_SEED, sender());
710 send_ctx s_ctx;
711 init_send_ctx(s_ctx, &sendbuf, 1, reinterpret_cast<uint64_t>(&r_ctx));
712 ASSERT_UCS_OK(tag_eager_bcopy(sender(), s_ctx));
713
714 wait_for_flag(&r_ctx.comp); // message consumed, should be able to post
715 ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
716
717 ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
718 }
719
720 UCS_TEST_SKIP_COND_P(test_tag, sw_rndv_expected,
721 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
722 UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
723 {
724 test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_request),
725 sender().iface_attr().cap.tag.rndv.max_hdr, true);
726 }
727
728 UCS_TEST_SKIP_COND_P(test_tag, rndv_limit,
729 !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
730 {
731 mapped_buffer sendbuf(8, SEND_SEED, sender());
732 ucs::ptr_vector<send_ctx> sctxs;
733 ucs_status_t status;
734 send_ctx *sctx_p;
735 void *op;
736
737 do {
738 sctx_p = new send_ctx;
739 init_send_ctx(*sctx_p, &sendbuf, 0xffff, 0);
740 status = tag_rndv_zcopy(sender(), *sctx_p);
741 sctxs.push_back(sctx_p);
742 } while (status == UCS_OK);
743
744 EXPECT_EQ(status, UCS_ERR_NO_RESOURCE);
745
746 for (ucs::ptr_vector<send_ctx>::const_iterator iter = sctxs.begin();
747 iter != sctxs.end(); ++iter)
748 {
749 op = (*iter)->rndv_op;
750 if (!UCS_PTR_IS_ERR(op)) {
751 tag_rndv_cancel(sender(), op);
752 }
753 }
754
755 ucs_log_push_handler(log_ep_destroy);
756 sender().destroy_eps();
757 ucs_log_pop_handler();
758 }
759
760 UCS_TEST_SKIP_COND_P(test_tag, sw_rndv_unexpected,
761 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
762 UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
763 {
764 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_request));
765 }
766
UCT_TAG_INSTANTIATE_TEST_CASE(test_tag)767 UCT_TAG_INSTANTIATE_TEST_CASE(test_tag)
768
769
770 #if defined (ENABLE_STATS) && IBV_HW_TM
771 extern "C" {
772 #include <uct/api/uct.h>
773 #include <uct/ib/rc/accel/rc_mlx5_common.h>
774 #include <uct/ib/base/ib_verbs.h>
775 }
776
777 class test_tag_stats : public test_tag {
778 public:
init()779 void init() {
780 stats_activate();
781 test_tag::init();
782 }
783
cleanup()784 void cleanup() {
785 test_tag::cleanup();
786 stats_restore();
787 }
788
ep_stats(const entity & e)789 ucs_stats_node_t *ep_stats(const entity &e)
790 {
791 return ucs_derived_of(e.ep(0), uct_base_ep_t)->stats;
792 }
793
iface_stats(const entity & e)794 ucs_stats_node_t *iface_stats(const entity &e)
795 {
796 return ucs_derived_of(e.iface(), uct_rc_mlx5_iface_common_t)->tm.stats;
797 }
798
provoke_sync(const entity & e)799 void provoke_sync(const entity &e)
800 {
801 uct_rc_mlx5_iface_common_t *iface;
802
803 iface = ucs_derived_of(e.iface(), uct_rc_mlx5_iface_common_t);
804
805 // Counters are synced every IBV_DEVICE_MAX_UNEXP_COUNT ops, set
806 // it one op before, so that any following unexpected message would
807 // cause HW ans SW counters sync.
808 iface->tm.unexpected_cnt = IBV_DEVICE_MAX_UNEXP_COUNT - 1;
809 }
810
check_tx_counters(int op,uint64_t op_val,int type,size_t len)811 void check_tx_counters(int op, uint64_t op_val, int type, size_t len)
812 {
813 uint64_t v;
814
815 v = UCS_STATS_GET_COUNTER(ep_stats(sender()), op);
816 EXPECT_EQ(op_val, v);
817
818 // With valgrind reduced messages is sent
819 if (!RUNNING_ON_VALGRIND) {
820 v = UCS_STATS_GET_COUNTER(ep_stats(sender()), type);
821 EXPECT_EQ(len, v);
822 }
823 }
824
check_rx_counter(int op,uint64_t val,entity & e)825 void check_rx_counter(int op, uint64_t val, entity &e)
826 {
827 EXPECT_EQ(val, UCS_STATS_GET_COUNTER(iface_stats(e), op));
828 }
829 };
830
831 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_expected_eager,
832 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_SHORT |
833 UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
834 UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
835 {
836 std::pair<send_func, std::pair<size_t, int> > sfuncs[3] = {
837 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_short),
838 std::make_pair(sender().iface_attr().cap.tag.eager.max_short,
839 static_cast<int>(UCT_EP_STAT_BYTES_SHORT))),
840
841 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_bcopy),
842 std::make_pair(sender().iface_attr().cap.tag.eager.max_bcopy,
843 static_cast<int>(UCT_EP_STAT_BYTES_BCOPY))),
844
845 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_zcopy),
846 std::make_pair(sender().iface_attr().cap.tag.eager.max_zcopy,
847 static_cast<int>(UCT_EP_STAT_BYTES_ZCOPY)))
848 };
849
850 for (int i = 0; i < 3; ++i) {
851 test_tag_expected(sfuncs[i].first, sfuncs[i].second.first);
852 check_tx_counters(UCT_EP_STAT_TAG, i + 1,
853 sfuncs[i].second.second,
854 sfuncs[i].second.first);
855 check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_EXP, i + 1, receiver());
856 }
857 }
858
859 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_unexpected_eager,
860 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
861 UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
862 {
863 std::pair<send_func, std::pair<size_t, int> > sfuncs[2] = {
864 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_bcopy),
865 std::make_pair(sender().iface_attr().cap.tag.eager.max_bcopy,
866 static_cast<int>(UCT_EP_STAT_BYTES_BCOPY))),
867
868 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_zcopy),
869 std::make_pair(sender().iface_attr().cap.tag.eager.max_zcopy,
870 static_cast<int>(UCT_EP_STAT_BYTES_ZCOPY)))
871 };
872
873 for (int i = 0; i < 2; ++i) {
874 test_tag_unexpected(sfuncs[i].first, sfuncs[i].second.first);
875 check_tx_counters(UCT_EP_STAT_TAG, i + 1,
876 sfuncs[i].second.second,
877 sfuncs[i].second.first);
878 check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_EAGER_UNEXP, i + 1, receiver());
879 }
880 }
881
882 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_list_ops,
883 !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
884 {
885 mapped_buffer recvbuf(32, RECV_SEED, receiver());
886 recv_ctx rctx;
887
888 init_recv_ctx(rctx, &recvbuf, 1);
889
890 ASSERT_UCS_OK(tag_post(receiver(), rctx));
891 check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_ADD, 1ul, receiver());
892
893 ASSERT_UCS_OK(tag_cancel(receiver(), rctx, 1));
894 check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_DEL, 1ul, receiver());
895
896 // Every ADD and DEL is paired with SYNC, but stats counter is increased
897 // when separate SYNC op is issued only. So, we expect it to be 0 after
898 // ADD and DEL operations.
899 check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_SYNC, 0ul, receiver());
900
901 // Provoke real SYNC op and send a message unexpectedly
902 provoke_sync(receiver());
903 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_bcopy));
904 check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_SYNC, 1ul, receiver());
905 }
906
907
908 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_rndv,
909 !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY |
910 UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
911 {
912 size_t len = sender().iface_attr().cap.tag.rndv.max_zcopy / 8;
913
914 // Check UNEXP_RNDV on the receiver
915 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_zcopy), len);
916 check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_UNEXP, 1ul, receiver());
917
918 // Check that sender receives RNDV_FIN in case of expected rndv message
919 test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_zcopy), len);
920 check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_FIN, 1ul, sender());
921
922
923 // Check UNEXP_RNDV_REQ on the receiver
924 test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_request));
925 check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_REQ_UNEXP, 1ul, receiver());
926
927 // Check NEXP_RNDV_REQ on the receiver
928 test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_request),
929 sender().iface_attr().cap.tag.rndv.max_hdr, true);
930 check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_REQ_EXP, 1ul, receiver());
931 }
932
UCT_TAG_INSTANTIATE_TEST_CASE(test_tag_stats)933 UCT_TAG_INSTANTIATE_TEST_CASE(test_tag_stats)
934
935 #endif
936
937
938 #if IBV_HW_TM
939
940 extern "C" {
941 #include <uct/ib/rc/accel/rc_mlx5_common.h>
942 }
943
944 // TODO: Unite with test_tag + add GRH testing for DC
945 class test_tag_mp_xrq : public uct_test {
946 public:
947 static const uint64_t SEND_SEED = 0xa1a1a1a1a1a1a1a1ul;
948 static const uint64_t AM_ID = 1;
949 typedef void (test_tag_mp_xrq::*send_func)(mapped_buffer*);
950
951 virtual void init();
952 test_tag_mp_xrq();
953 uct_rc_mlx5_iface_common_t* rc_mlx5_iface(entity &e);
954 void send_eager_bcopy(mapped_buffer *buf);
955 void send_eager_zcopy(mapped_buffer *buf);
956 void send_rndv_zcopy(mapped_buffer *buf);
957 void send_rndv_request(mapped_buffer *buf);
958 void send_am_bcopy(mapped_buffer *buf);
959 void test_common(send_func sfunc, size_t num_segs, size_t exp_segs = 1,
960 bool is_eager = true);
961
962 static ucs_status_t am_handler(void *arg, void *data, size_t length,
963 unsigned flags);
964
965 static ucs_status_t unexp_eager(void *arg, void *data, size_t length,
966 unsigned flags, uct_tag_t stag,
967 uint64_t imm, void **context);
968
969 static ucs_status_t unexp_rndv(void *arg, unsigned flags, uint64_t stag,
970 const void *header, unsigned header_length,
971 uint64_t remote_addr, size_t length,
972 const void *rkey_buf);
973
974 protected:
975 static size_t m_rx_counter;
976 std::vector<void*> m_uct_descs;
977 bool m_hold_uct_desc;
978
sender()979 uct_test::entity& sender() {
980 return **m_entities.begin();
981 }
982
receiver()983 uct_test::entity& receiver() {
984 return **(m_entities.end() - 1);
985 }
986
987 private:
988 ucs_status_t unexp_handler(void *data, unsigned flags, uint64_t imm,
989 void **context);
990 ucs_status_t handle_uct_desc(void *data, unsigned flags);
991 void set_env_var_or_skip(void *config, const char *var, const char *val);
992 size_t m_max_hdr;
993 bool m_first_received;
994 bool m_last_received;
995 uct_completion_t m_uct_comp;
996 };
997
998 size_t test_tag_mp_xrq::m_rx_counter = 0;
999
test_tag_mp_xrq()1000 test_tag_mp_xrq::test_tag_mp_xrq() : m_hold_uct_desc(false),
1001 m_first_received(false),
1002 m_last_received(false)
1003 {
1004 m_max_hdr = sizeof(ibv_tmh) + sizeof(ibv_rvh);
1005 m_uct_comp.count = 512; // We do not need completion func to be invoked
1006 m_uct_comp.func = NULL;
1007 }
1008
rc_mlx5_iface(entity & e)1009 uct_rc_mlx5_iface_common_t* test_tag_mp_xrq::rc_mlx5_iface(entity &e)
1010 {
1011 return ucs_derived_of(e.iface(), uct_rc_mlx5_iface_common_t);
1012 }
1013
set_env_var_or_skip(void * config,const char * var,const char * val)1014 void test_tag_mp_xrq::set_env_var_or_skip(void *config, const char *var,
1015 const char *val)
1016 {
1017 ucs_status_t status = uct_config_modify(config, var, val);
1018 if (status != UCS_OK) {
1019 ucs_warn("%s", ucs_status_string(status));
1020 UCS_TEST_SKIP_R(std::string("Can't set ") + var);
1021 }
1022 }
1023
init()1024 void test_tag_mp_xrq::init()
1025 {
1026 set_env_var_or_skip(m_iface_config, "RC_TM_ENABLE", "y");
1027 set_env_var_or_skip(m_iface_config, "RC_TM_MP_SRQ_ENABLE", "try");
1028 set_env_var_or_skip(m_iface_config, "RC_TM_MP_NUM_STRIDES", "8");
1029 set_env_var_or_skip(m_md_config, "MLX5_DEVX_OBJECTS", "dct,dcsrq,rcsrq,rcqp");
1030
1031 uct_test::init();
1032
1033 entity *sender = uct_test::create_entity(0ul, NULL, unexp_eager, unexp_rndv,
1034 reinterpret_cast<void*>(this),
1035 reinterpret_cast<void*>(this));
1036 m_entities.push_back(sender);
1037
1038 entity *receiver = uct_test::create_entity(0ul, NULL, unexp_eager, unexp_rndv,
1039 reinterpret_cast<void*>(this),
1040 reinterpret_cast<void*>(this));
1041 m_entities.push_back(receiver);
1042
1043 if (!UCT_RC_MLX5_MP_ENABLED(rc_mlx5_iface(test_tag_mp_xrq::sender()))) {
1044 UCS_TEST_SKIP_R("No MP XRQ support");
1045 }
1046
1047 sender->connect(0, *receiver, 0);
1048
1049 uct_iface_set_am_handler(receiver->iface(), AM_ID, am_handler, this, 0);
1050 }
1051
send_eager_bcopy(mapped_buffer * buf)1052 void test_tag_mp_xrq::send_eager_bcopy(mapped_buffer *buf)
1053 {
1054 ssize_t len = uct_ep_tag_eager_bcopy(sender().ep(0), 0x11,
1055 reinterpret_cast<uint64_t>(this),
1056 mapped_buffer::pack,
1057 reinterpret_cast<void*>(buf), 0);
1058
1059 EXPECT_EQ(buf->length(), static_cast<size_t>(len));
1060 }
1061
send_eager_zcopy(mapped_buffer * buf)1062 void test_tag_mp_xrq::send_eager_zcopy(mapped_buffer *buf)
1063 {
1064 UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buf->ptr(), buf->length(), buf->memh(),
1065 sender().iface_attr().cap.tag.eager.max_iov);
1066
1067 ucs_status_t status = uct_ep_tag_eager_zcopy(sender().ep(0), 0x11,
1068 reinterpret_cast<uint64_t>(this),
1069 iov, iovcnt, 0, &m_uct_comp);
1070 ASSERT_UCS_OK_OR_INPROGRESS(status);
1071 }
1072
send_rndv_zcopy(mapped_buffer * buf)1073 void test_tag_mp_xrq::send_rndv_zcopy(mapped_buffer *buf)
1074 {
1075 UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buf->ptr(), buf->length(), buf->memh(),
1076 sender().iface_attr().cap.tag.rndv.max_iov);
1077
1078 uint64_t dummy_hdr = 0xFAFA;
1079 ucs_status_ptr_t rndv_op = uct_ep_tag_rndv_zcopy(sender().ep(0), 0x11, &dummy_hdr,
1080 sizeof(dummy_hdr), iov,
1081 iovcnt, 0, &m_uct_comp);
1082 ASSERT_FALSE(UCS_PTR_IS_ERR(rndv_op));
1083
1084 // There will be no real RNDV performed, cancel the op to avoid mpool
1085 // warning on exit
1086 ASSERT_UCS_OK(uct_ep_tag_rndv_cancel(sender().ep(0),rndv_op));
1087 }
1088
send_rndv_request(mapped_buffer * buf)1089 void test_tag_mp_xrq::send_rndv_request(mapped_buffer *buf)
1090 {
1091 size_t size = sender().iface_attr().cap.tag.rndv.max_hdr;
1092 void *hdr = alloca(size);
1093
1094 ASSERT_UCS_OK(uct_ep_tag_rndv_request(sender().ep(0), 0x11, hdr, size, 0));
1095 }
1096
send_am_bcopy(mapped_buffer * buf)1097 void test_tag_mp_xrq::send_am_bcopy(mapped_buffer *buf)
1098 {
1099 ssize_t len = uct_ep_am_bcopy(sender().ep(0), AM_ID, mapped_buffer::pack,
1100 reinterpret_cast<void*>(buf), 0);
1101
1102 EXPECT_EQ(buf->length(), static_cast<size_t>(len));
1103 }
1104
test_common(send_func sfunc,size_t num_segs,size_t exp_segs,bool is_eager)1105 void test_tag_mp_xrq::test_common(send_func sfunc, size_t num_segs,
1106 size_t exp_segs, bool is_eager)
1107 {
1108 size_t seg_size = rc_mlx5_iface(sender())->super.super.config.seg_size;
1109 size_t seg_num = is_eager ? num_segs : 1;
1110 size_t exp_val = is_eager ? exp_segs : 1;
1111 size_t size = (seg_size * seg_num) - m_max_hdr;
1112 m_rx_counter = 0;
1113 m_first_received = m_last_received = false;
1114
1115 EXPECT_TRUE(size <= sender().iface_attr().cap.tag.eager.max_bcopy);
1116 mapped_buffer buf(size, SEND_SEED, sender());
1117
1118 (this->*sfunc)(&buf);
1119
1120 wait_for_value(&m_rx_counter, exp_val, true);
1121 EXPECT_EQ(exp_val, m_rx_counter);
1122 EXPECT_EQ(is_eager, m_first_received); // relevant for eager only
1123 EXPECT_EQ(is_eager, m_last_received); // relevant for eager only
1124 }
1125
handle_uct_desc(void * data,unsigned flags)1126 ucs_status_t test_tag_mp_xrq::handle_uct_desc(void *data, unsigned flags)
1127 {
1128 if ((flags & UCT_CB_PARAM_FLAG_DESC) && m_hold_uct_desc) {
1129 m_uct_descs.push_back(data);
1130 return UCS_INPROGRESS;
1131 }
1132
1133 return UCS_OK;
1134 }
1135
am_handler(void * arg,void * data,size_t length,unsigned flags)1136 ucs_status_t test_tag_mp_xrq::am_handler(void *arg, void *data, size_t length,
1137 unsigned flags)
1138 {
1139 EXPECT_TRUE(flags & UCT_CB_PARAM_FLAG_FIRST);
1140 EXPECT_FALSE(flags & UCT_CB_PARAM_FLAG_MORE);
1141
1142 m_rx_counter++;
1143
1144 test_tag_mp_xrq *self = reinterpret_cast<test_tag_mp_xrq*>(arg);
1145 return self->handle_uct_desc(data, flags);
1146 }
1147
unexp_handler(void * data,unsigned flags,uint64_t imm,void ** context)1148 ucs_status_t test_tag_mp_xrq::unexp_handler(void *data, unsigned flags,
1149 uint64_t imm, void **context)
1150 {
1151 void *self = reinterpret_cast<void*>(this);
1152
1153 if (flags & UCT_CB_PARAM_FLAG_FIRST) {
1154 // Set the message context which will be passed back with the rest of
1155 // message fragments
1156 *context = self;
1157 m_first_received = true;
1158
1159 } else {
1160 // Check that the correct message context is passed with all fragments
1161 EXPECT_EQ(self, *context);
1162 }
1163
1164 if (!(flags & UCT_CB_PARAM_FLAG_MORE)) {
1165 // Last message should contain valid immediate value
1166 EXPECT_EQ(reinterpret_cast<uint64_t>(this), imm);
1167 m_last_received = true;
1168 } else {
1169 // Immediate value is passed with the last message only
1170 EXPECT_EQ(0ul, imm);
1171 }
1172
1173
1174 return handle_uct_desc(data, flags);
1175 }
1176
unexp_eager(void * arg,void * data,size_t length,unsigned flags,uct_tag_t stag,uint64_t imm,void ** context)1177 ucs_status_t test_tag_mp_xrq::unexp_eager(void *arg, void *data, size_t length,
1178 unsigned flags, uct_tag_t stag,
1179 uint64_t imm, void **context)
1180 {
1181 test_tag_mp_xrq *self = reinterpret_cast<test_tag_mp_xrq*>(arg);
1182
1183 m_rx_counter++;
1184
1185 return self->unexp_handler(data, flags, imm, context);
1186 }
1187
unexp_rndv(void * arg,unsigned flags,uint64_t stag,const void * header,unsigned header_length,uint64_t remote_addr,size_t length,const void * rkey_buf)1188 ucs_status_t test_tag_mp_xrq::unexp_rndv(void *arg, unsigned flags,
1189 uint64_t stag, const void *header,
1190 unsigned header_length,
1191 uint64_t remote_addr, size_t length,
1192 const void *rkey_buf)
1193 {
1194 EXPECT_FALSE(flags & UCT_CB_PARAM_FLAG_FIRST);
1195 EXPECT_FALSE(flags & UCT_CB_PARAM_FLAG_MORE);
1196
1197 m_rx_counter++;
1198
1199 return UCS_OK;
1200 }
1201
UCS_TEST_P(test_tag_mp_xrq,config)1202 UCS_TEST_P(test_tag_mp_xrq, config)
1203 {
1204 uct_rc_mlx5_iface_common_t *iface = rc_mlx5_iface(sender());
1205
1206 // MP XRQ is supported with tag offload only
1207 EXPECT_TRUE(UCT_RC_MLX5_TM_ENABLED(iface));
1208
1209 // With MP XRQ segment size should be equal to MTU, because HW generates
1210 // CQE per each received MTU
1211 size_t mtu = uct_ib_mtu_value(uct_ib_iface_port_attr(&(iface)->super.super)->active_mtu);
1212 EXPECT_EQ(mtu, iface->super.super.config.seg_size);
1213
1214 const uct_iface_attr *attrs = &sender().iface_attr();
1215
1216 // Max tag bcopy is limited by tag tx memory pool
1217 EXPECT_EQ(iface->tm.max_bcopy - sizeof(ibv_tmh),
1218 attrs->cap.tag.eager.max_bcopy);
1219 EXPECT_GT(attrs->cap.tag.eager.max_bcopy,
1220 iface->super.super.config.seg_size);
1221
1222 // Max tag zcopy is limited by maximal IB message size
1223 EXPECT_EQ(uct_ib_iface_port_attr(&iface->super.super)->max_msg_sz - sizeof(ibv_tmh),
1224 attrs->cap.tag.eager.max_zcopy);
1225
1226 // Maximal AM size should not exceed segment size, so it would always
1227 // arrive in one-fragment packet (with header it should be strictly less)
1228 EXPECT_LT(attrs->cap.am.max_bcopy, iface->super.super.config.seg_size);
1229 EXPECT_LT(attrs->cap.am.max_zcopy, iface->super.super.config.seg_size);
1230 }
1231
UCS_TEST_P(test_tag_mp_xrq,desc_release)1232 UCS_TEST_P(test_tag_mp_xrq, desc_release)
1233 {
1234 m_hold_uct_desc = true; // We want to "hold" UCT memory descriptors
1235 std::pair<send_func, bool> sfuncs[5] = {
1236 std::make_pair(&test_tag_mp_xrq::send_eager_bcopy, true),
1237 std::make_pair(&test_tag_mp_xrq::send_eager_zcopy, true),
1238 std::make_pair(&test_tag_mp_xrq::send_rndv_zcopy, false),
1239 std::make_pair(&test_tag_mp_xrq::send_rndv_request, false),
1240 std::make_pair(&test_tag_mp_xrq::send_am_bcopy, false)
1241 };
1242
1243 for (int i = 0; i < 5; ++i) {
1244 test_common(sfuncs[i].first, 3, 3, sfuncs[i].second);
1245 }
1246
1247 for (ucs::ptr_vector<void>::const_iterator iter = m_uct_descs.begin();
1248 iter != m_uct_descs.end(); ++iter)
1249 {
1250 uct_iface_release_desc(*iter);
1251 }
1252 }
1253
UCS_TEST_P(test_tag_mp_xrq,am)1254 UCS_TEST_P(test_tag_mp_xrq, am)
1255 {
1256 test_common(&test_tag_mp_xrq::send_am_bcopy, 1, 1, false);
1257 }
1258
UCS_TEST_P(test_tag_mp_xrq,bcopy_eager_only)1259 UCS_TEST_P(test_tag_mp_xrq, bcopy_eager_only)
1260 {
1261 test_common(&test_tag_mp_xrq::send_eager_bcopy, 1);
1262 }
1263
UCS_TEST_P(test_tag_mp_xrq,zcopy_eager_only)1264 UCS_TEST_P(test_tag_mp_xrq, zcopy_eager_only)
1265 {
1266 test_common(&test_tag_mp_xrq::send_eager_zcopy, 1);
1267 }
1268
UCS_TEST_P(test_tag_mp_xrq,bcopy_eager)1269 UCS_TEST_P(test_tag_mp_xrq, bcopy_eager)
1270 {
1271 test_common(&test_tag_mp_xrq::send_eager_bcopy, 5, 5);
1272 }
1273
UCS_TEST_P(test_tag_mp_xrq,zcopy_eager)1274 UCS_TEST_P(test_tag_mp_xrq, zcopy_eager)
1275 {
1276 test_common(&test_tag_mp_xrq::send_eager_zcopy, 5, 5);
1277 }
1278
UCS_TEST_P(test_tag_mp_xrq,rndv_zcopy)1279 UCS_TEST_P(test_tag_mp_xrq, rndv_zcopy)
1280 {
1281 test_common(&test_tag_mp_xrq::send_rndv_zcopy, 1, 1, false);
1282 }
1283
UCS_TEST_P(test_tag_mp_xrq,rndv_request)1284 UCS_TEST_P(test_tag_mp_xrq, rndv_request)
1285 {
1286 test_common(&test_tag_mp_xrq::send_rndv_request, 1, 1, false);
1287 }
1288
1289 UCT_TAG_INSTANTIATE_TEST_CASE(test_tag_mp_xrq)
1290
1291 #endif
1292