1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2017.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 extern "C" {
8 #include <uct/api/uct.h>
9 }
10 #include <common/test.h>
11 #include "uct_test.h"
12 
13 #define UCT_TAG_INSTANTIATE_TEST_CASE(_test_case) \
14     _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_mlx5) \
15     _UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5)
16 
17 class test_tag : public uct_test {
18 public:
19     static const uint64_t SEND_SEED  = 0xa1a1a1a1a1a1a1a1ul;
20     static const uint64_t RECV_SEED  = 0xb2b2b2b2b2b2b2b2ul;
21     static const uint64_t MASK       = 0xfffffffffffffffful;
22 
23     struct rndv_hdr {
24         uint64_t          priv[2];
25         uint16_t          tail;
26     } UCS_S_PACKED;
27 
28     struct recv_ctx {
29         mapped_buffer     *mbuf;
30         uct_tag_t         tag;
31         uct_tag_t         tmask;
32         bool              take_uct_desc;
33         bool              comp;
34         bool              unexp;
35         bool              consumed;
36         bool              sw_rndv;
37         uct_tag_context_t uct_ctx;
38         ucs_status_t      status;
39     };
40 
41     struct send_ctx {
42         mapped_buffer    *mbuf;
43         void             *rndv_op;
44         uct_tag_t        tag;
45         uint64_t         imm_data;
46         uct_completion_t uct_comp;
47         ucs_status_t     status;
48         bool             sw_rndv;
49         bool             comp;
50         bool             unexp;
51     };
52 
53     typedef ucs_status_t (test_tag::*send_func)(entity&, send_ctx&);
54 
init()55     void init()
56     {
57         ucs_status_t status = uct_config_modify(m_iface_config,
58                                                 "RC_TM_ENABLE", "y");
59         ASSERT_TRUE((status == UCS_OK) || (status == UCS_ERR_NO_ELEM));
60 
61         status = uct_config_modify(m_iface_config, "RC_TM_MP_SRQ_ENABLE", "no");
62         ASSERT_TRUE((status == UCS_OK) || (status == UCS_ERR_NO_ELEM));
63 
64         uct_test::init();
65 
66         entity *sender = uct_test::create_entity(0ul, NULL, unexp_eager,
67                                                  unexp_rndv,
68                                                  reinterpret_cast<void*>(this),
69                                                  reinterpret_cast<void*>(this));
70         m_entities.push_back(sender);
71 
72         check_skip_test();
73 
74         if (UCT_DEVICE_TYPE_SELF == GetParam()->dev_type) {
75             sender->connect(0, *sender, 0);
76         } else {
77             entity *receiver = uct_test::create_entity(0ul, NULL, unexp_eager,
78                                                        unexp_rndv,
79                                                        reinterpret_cast<void*>(this),
80                                                        reinterpret_cast<void*>(this));
81             m_entities.push_back(receiver);
82 
83             sender->connect(0, *receiver, 0);
84         }
85     }
86 
init_send_ctx(send_ctx & s,mapped_buffer * b,uct_tag_t t,uint64_t i,bool unexp_flow=true)87     void init_send_ctx(send_ctx &s,mapped_buffer *b, uct_tag_t t, uint64_t i,
88                        bool unexp_flow = true)
89     {
90         s.mbuf           = b;
91         s.rndv_op        = NULL;
92         s.tag            = t;
93         s.imm_data       = i;
94         s.uct_comp.count = 1;
95         s.uct_comp.func  = send_completion;
96         s.sw_rndv        = s.comp = false;
97         s.unexp          = unexp_flow;
98         s.status         = UCS_ERR_NO_PROGRESS;
99     }
100 
init_recv_ctx(recv_ctx & r,mapped_buffer * b,uct_tag_t t,uct_tag_t m=MASK,bool uct_d=false)101     void init_recv_ctx(recv_ctx &r,  mapped_buffer *b, uct_tag_t t,
102                        uct_tag_t m = MASK, bool uct_d = false)
103     {
104         r.mbuf                    = b;
105         r.tag                     = t;
106         r.tmask                   = m;
107         r.uct_ctx.completed_cb    = completed;
108         r.uct_ctx.tag_consumed_cb = tag_consumed;
109         r.uct_ctx.rndv_cb         = sw_rndv_completed;
110         r.take_uct_desc           = uct_d;
111         r.status                  = UCS_ERR_NO_PROGRESS;
112         r.comp = r.unexp = r.consumed = r.sw_rndv = false;
113     }
114 
tag_eager_short(entity & e,send_ctx & ctx)115     ucs_status_t tag_eager_short(entity &e, send_ctx &ctx)
116     {
117         ctx.status = uct_ep_tag_eager_short(e.ep(0), ctx.tag, ctx.mbuf->ptr(),
118                                             ctx.mbuf->length());
119         ctx.comp   = true;
120 
121         return ctx.status;
122     }
123 
tag_eager_bcopy(entity & e,send_ctx & ctx)124     ucs_status_t tag_eager_bcopy(entity &e, send_ctx &ctx)
125     {
126         ssize_t status = uct_ep_tag_eager_bcopy(e.ep(0), ctx.tag,
127                                                 ctx.imm_data, mapped_buffer::pack,
128                                                 reinterpret_cast<void*>(ctx.mbuf),
129                                                 0);
130         ctx.status = (status >= 0) ? UCS_OK : static_cast<ucs_status_t>(status);
131         ctx.comp   = true;
132 
133         return ctx.status;
134     }
135 
tag_eager_zcopy(entity & e,send_ctx & ctx)136     ucs_status_t tag_eager_zcopy(entity &e, send_ctx &ctx)
137     {
138         UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ctx.mbuf->ptr(),
139                                 ctx.mbuf->length(), ctx.mbuf->memh(),
140                                 e.iface_attr().cap.tag.eager.max_iov);
141 
142         ucs_status_t status = uct_ep_tag_eager_zcopy(e.ep(0), ctx.tag,
143                                                      ctx.imm_data, iov, iovcnt,
144                                                      0, &ctx.uct_comp);
145         if (status == UCS_INPROGRESS) {
146             status = UCS_OK;
147         }
148         return status;
149     }
150 
tag_rndv_zcopy(entity & e,send_ctx & ctx)151     ucs_status_t tag_rndv_zcopy(entity &e, send_ctx &ctx)
152     {
153          rndv_hdr hdr = {{ctx.imm_data,
154                           reinterpret_cast<uint64_t>(&ctx)
155                          },
156                          0xFAFA
157                         };
158 
159          UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ctx.mbuf->ptr(),
160                                  ctx.mbuf->length(), ctx.mbuf->memh(), 1);
161 
162          ctx.rndv_op = uct_ep_tag_rndv_zcopy(e.ep(0), ctx.tag, &hdr,
163                                              sizeof(hdr), iov, iovcnt, 0,
164                                              &ctx.uct_comp);
165 
166          return  (UCS_PTR_IS_ERR(ctx.rndv_op)) ? UCS_PTR_STATUS(ctx.rndv_op) :
167                                                  UCS_OK;
168     }
169 
tag_rndv_cancel(entity & e,void * op)170     ucs_status_t tag_rndv_cancel(entity &e, void *op)
171     {
172         return uct_ep_tag_rndv_cancel(e.ep(0), op);
173     }
174 
tag_rndv_request(entity & e,send_ctx & ctx)175     ucs_status_t tag_rndv_request(entity &e, send_ctx &ctx)
176     {
177         ctx.sw_rndv = true;
178 
179         if (ctx.unexp) {
180             // Unexpected flow, will need to analyze ctx data on the receiver
181             rndv_hdr hdr = {{ctx.imm_data,
182                              reinterpret_cast<uint64_t>(&ctx)
183                             },
184                             0xFAFA
185                            };
186             ctx.status = uct_ep_tag_rndv_request(e.ep(0), ctx.tag, &hdr,
187                                                  sizeof(hdr), 0);
188         } else {
189             // Expected flow, send just plain data (will be stored in rx buf by HCA)
190             ctx.status = uct_ep_tag_rndv_request(e.ep(0), ctx.tag, ctx.mbuf->ptr(),
191                                                  ctx.mbuf->length(), 0);
192         }
193         ctx.comp = true;
194 
195         return ctx.status;
196     }
197 
tag_post(entity & e,recv_ctx & ctx)198     ucs_status_t tag_post(entity &e, recv_ctx &ctx)
199     {
200         UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, ctx.mbuf->ptr(),
201                                 ctx.mbuf->length(), ctx.mbuf->memh(), 1);
202         return uct_iface_tag_recv_zcopy(e.iface(), ctx.tag, ctx.tmask,
203                                         iov, iovcnt, &ctx.uct_ctx);
204     }
205 
tag_cancel(entity & e,recv_ctx & ctx,int force)206     ucs_status_t tag_cancel(entity &e, recv_ctx &ctx, int force)
207     {
208         return uct_iface_tag_recv_cancel(e.iface(), &ctx.uct_ctx, force);
209     }
210 
211 
212     // If expected message arrives, two callbacks should be called:
213     // tag_consumed and completed (unexpected callback should not be
214     // called). And it is vice versa if message arrives unexpectedly.
215     // If expected SW RNDV request arrives tag_consumed and sw_rndv_cb
216     // should be called.
check_rx_completion(recv_ctx & ctx,bool is_expected,uint64_t seed,ucs_status_t status=UCS_OK,bool is_sw_rndv=false)217     void check_rx_completion(recv_ctx &ctx, bool is_expected, uint64_t seed,
218                              ucs_status_t status = UCS_OK, bool is_sw_rndv = false)
219     {
220         EXPECT_EQ(ctx.consumed, is_expected);
221         EXPECT_EQ(ctx.comp,     (is_expected && !is_sw_rndv));
222         EXPECT_EQ(ctx.unexp,    (!is_expected && !is_sw_rndv));
223         EXPECT_EQ(ctx.sw_rndv,  is_sw_rndv);
224         EXPECT_EQ(ctx.status,   status);
225         if (is_expected) {
226             ctx.mbuf->pattern_check(seed);
227         }
228     }
229 
check_tx_completion(send_ctx & ctx)230     void check_tx_completion(send_ctx &ctx)
231     {
232         wait_for_flag(&ctx.comp);
233         EXPECT_TRUE(ctx.comp);
234         EXPECT_EQ(ctx.status, UCS_OK);
235     }
236 
test_tag_expected(send_func sfunc,size_t length=75,bool is_sw_rndv=false)237     void test_tag_expected(send_func sfunc, size_t length = 75,
238                            bool is_sw_rndv = false) {
239         uct_tag_t tag = 11;
240 
241         if (RUNNING_ON_VALGRIND) {
242             length = ucs_min(length, 128U);
243         }
244 
245         mapped_buffer recvbuf(length, RECV_SEED, receiver());
246         recv_ctx r_ctx;
247         init_recv_ctx(r_ctx, &recvbuf, tag);
248         ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
249 
250         short_progress_loop();
251 
252         mapped_buffer sendbuf(length, SEND_SEED, sender());
253         send_ctx s_ctx;
254         init_send_ctx(s_ctx, &sendbuf, tag, reinterpret_cast<uint64_t>(&r_ctx),
255                       false);
256         ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
257 
258         // max rndv can be quite big, use increased timeout
259         wait_for_flag(is_sw_rndv ? &r_ctx.sw_rndv : &r_ctx.comp,
260                       3 * DEFAULT_TIMEOUT_SEC);
261 
262         check_rx_completion(r_ctx, true, SEND_SEED, UCS_OK, is_sw_rndv);
263 
264         // If it was RNDV send, need to wait send completion as well
265         check_tx_completion(s_ctx);
266 
267         flush();
268     }
269 
test_tag_unexpected(send_func sfunc,size_t length=75,bool take_uct_desc=false)270     void test_tag_unexpected(send_func sfunc, size_t length = 75,
271                              bool take_uct_desc = false)
272     {
273         uct_tag_t tag = 11;
274 
275         if (RUNNING_ON_VALGRIND) {
276             length = ucs_min(length, 128U);
277         }
278 
279         mapped_buffer recvbuf(length, RECV_SEED, receiver());
280         mapped_buffer sendbuf(length, SEND_SEED, sender());
281         recv_ctx r_ctx;
282         init_recv_ctx(r_ctx, &recvbuf, tag, MASK, take_uct_desc);
283         send_ctx s_ctx;
284         init_send_ctx(s_ctx, &sendbuf, tag, reinterpret_cast<uint64_t>(&r_ctx));
285         ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
286 
287         wait_for_flag(&r_ctx.unexp);
288         if (static_cast<send_func>(&test_tag::tag_rndv_zcopy) == sfunc) {
289             // Need to cancel origin RNDV operation, beacuse no RNDV_COMP
290             // will be received (as it arrived unexpectedly and should be
291             // handled by SW).
292             ASSERT_UCS_OK(tag_rndv_cancel(sender(), s_ctx.rndv_op));
293         }
294 
295         check_rx_completion(r_ctx, false, SEND_SEED);
296         flush();
297     }
298 
test_tag_wrong_tag(send_func sfunc)299     void test_tag_wrong_tag(send_func sfunc)
300     {
301         const size_t length = 65;
302         uct_tag_t    tag    = 11;
303 
304         mapped_buffer sendbuf(length, SEND_SEED, sender());
305         mapped_buffer recvbuf(length, RECV_SEED, receiver());
306 
307         // Post modified tag for incoming message to be reported as unexpected
308         // and not to be macthed.
309         recv_ctx r_ctx;
310         init_recv_ctx(r_ctx, &recvbuf, tag + 1);
311         send_ctx s_ctx;
312         init_send_ctx(s_ctx, &sendbuf, tag, reinterpret_cast<uint64_t>(&r_ctx));
313 
314         ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
315 
316         short_progress_loop();
317 
318         ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
319 
320         wait_for_flag(&r_ctx.unexp);
321 
322         // Message should be reported as unexpected and filled with
323         // recv seed (unchanged), as the incoming tag does not match the expected
324         check_rx_completion(r_ctx, false, RECV_SEED);
325         ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
326         flush();
327     }
328 
test_tag_mask(send_func sfunc)329     void test_tag_mask(send_func sfunc)
330     {
331         const size_t length = 65;
332 
333         mapped_buffer recvbuf(length, RECV_SEED, receiver());
334 
335         // Post tag and tag mask in a way that it matches sender tag with
336         // tag_mask applied, but is not exactly the same.
337         recv_ctx r_ctx;
338         init_recv_ctx(r_ctx, &recvbuf, 0xff, 0xff);
339         ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
340 
341         short_progress_loop();
342 
343         mapped_buffer sendbuf(length, SEND_SEED, sender());
344         send_ctx s_ctx;
345         init_send_ctx(s_ctx, &sendbuf, 0xffff, reinterpret_cast<uint64_t>(&r_ctx));
346         ASSERT_UCS_OK((this->*sfunc)(sender(), s_ctx));
347         wait_for_flag(&r_ctx.comp);
348 
349         // Should be matched because tags are equal with tag mask applied.
350         check_rx_completion(r_ctx, true, SEND_SEED);
351 
352         // If it was RNDV send, need to wait send completion as well
353         check_tx_completion(s_ctx);
354         flush();
355     }
356 
unexpected_handler(recv_ctx * ctx,void * data,unsigned flags)357     ucs_status_t unexpected_handler(recv_ctx *ctx, void *data, unsigned flags)
358     {
359         if (ctx->take_uct_desc && (flags & UCT_CB_PARAM_FLAG_DESC)) {
360             m_uct_descs.push_back(data);
361             return UCS_INPROGRESS;
362         } else {
363             return UCS_OK;
364         }
365     }
366 
tag_consumed(uct_tag_context_t * self)367     static void tag_consumed(uct_tag_context_t *self)
368     {
369         recv_ctx *user_ctx = ucs_container_of(self, recv_ctx, uct_ctx);
370         user_ctx->consumed = true;
371     }
372 
completed(uct_tag_context_t * self,uct_tag_t stag,uint64_t imm,size_t length,ucs_status_t status)373     static void completed(uct_tag_context_t *self, uct_tag_t stag, uint64_t imm,
374                           size_t length, ucs_status_t status)
375     {
376         recv_ctx *user_ctx = ucs_container_of(self, recv_ctx, uct_ctx);
377         user_ctx->comp     = true;
378         user_ctx->status   = status;
379         EXPECT_EQ(user_ctx->tag, (stag & user_ctx->tmask));
380         EXPECT_EQ(user_ctx->mbuf->length(), length);
381     }
382 
sw_rndv_completed(uct_tag_context_t * self,uct_tag_t stag,const void * header,unsigned header_length,ucs_status_t status)383     static void sw_rndv_completed(uct_tag_context_t *self, uct_tag_t stag,
384                                   const void *header, unsigned header_length,
385                                   ucs_status_t status)
386     {
387         recv_ctx *user_ctx = ucs_container_of(self, recv_ctx, uct_ctx);
388         user_ctx->sw_rndv  = true;
389         user_ctx->status   = status;
390         EXPECT_EQ(user_ctx->tag, (stag & user_ctx->tmask));
391         EXPECT_EQ(user_ctx->mbuf->length(), header_length);
392     }
393 
unexp_eager(void * arg,void * data,size_t length,unsigned flags,uct_tag_t stag,uint64_t imm,void ** context)394     static ucs_status_t unexp_eager(void *arg, void *data, size_t length,
395                                     unsigned flags, uct_tag_t stag,
396                                     uint64_t imm, void **context)
397     {
398         recv_ctx *user_ctx = reinterpret_cast<recv_ctx*>(imm);
399         user_ctx->unexp    = true;
400         user_ctx->status   = UCS_OK;
401         if (user_ctx->tag == stag) {
402             memcpy(user_ctx->mbuf->ptr(), data, ucs_min(length,
403                    user_ctx->mbuf->length()));
404             user_ctx->mbuf->pattern_check(SEND_SEED);
405         }
406 
407         test_tag *self = reinterpret_cast<test_tag*>(arg);
408         return self->unexpected_handler(user_ctx, data, flags);
409     }
410 
unexp_rndv(void * arg,unsigned flags,uint64_t stag,const void * header,unsigned header_length,uint64_t remote_addr,size_t length,const void * rkey_buf)411     static ucs_status_t unexp_rndv(void *arg, unsigned flags, uint64_t stag,
412                                    const void *header, unsigned header_length,
413                                    uint64_t remote_addr, size_t length,
414                                    const void *rkey_buf)
415     {
416         rndv_hdr *rhdr  = const_cast<rndv_hdr*>(static_cast<const rndv_hdr*>(header));
417         recv_ctx *r_ctx = reinterpret_cast<recv_ctx*>(rhdr->priv[0]);
418         send_ctx *s_ctx = reinterpret_cast<send_ctx*>(rhdr->priv[1]);
419         uint16_t  tail  = rhdr->tail;
420         r_ctx->unexp  = true;
421         r_ctx->status = UCS_OK;
422 
423         EXPECT_EQ(tail, 0xFAFA);
424         EXPECT_EQ(s_ctx->tag, stag);
425         EXPECT_EQ(length, s_ctx->sw_rndv ? 0 : s_ctx->mbuf->length());
426         EXPECT_EQ(remote_addr, s_ctx->sw_rndv ? 0ul :
427                   reinterpret_cast<uint64_t>(s_ctx->mbuf->ptr()));
428 
429         test_tag *self = reinterpret_cast<test_tag*>(arg);
430         return self->unexpected_handler(r_ctx, const_cast<void*>(header), flags);
431     }
432 
am_handler(void * arg,void * data,size_t length,unsigned flags)433     static ucs_status_t am_handler(void *arg, void *data, size_t length,
434                                    unsigned flags)
435     {
436         is_am_received = true;
437         return UCS_OK;
438     }
439 
440     static ucs_log_func_rc_t
log_ep_destroy(const char * file,unsigned line,const char * function,ucs_log_level_t level,const ucs_log_component_config_t * comp_conf,const char * message,va_list ap)441     log_ep_destroy(const char *file, unsigned line, const char *function,
442                    ucs_log_level_t level,
443                    const ucs_log_component_config_t *comp_conf,
444                    const char *message, va_list ap)
445     {
446         if (level == UCS_LOG_LEVEL_WARN) {
447             // Ignore warnings about uncompleted operations during ep destroy
448             return UCS_LOG_FUNC_RC_STOP;
449         }
450         return UCS_LOG_FUNC_RC_CONTINUE;
451     }
452 
send_completion(uct_completion_t * self,ucs_status_t status)453     static void send_completion(uct_completion_t *self, ucs_status_t status)
454     {
455         send_ctx *user_ctx = ucs_container_of(self, send_ctx, uct_comp);
456         user_ctx->comp     = true;
457         user_ctx->status   = status;
458     }
459 
460 
461 protected:
sender()462     uct_test::entity& sender() {
463         return **m_entities.begin();
464     }
465 
receiver()466     uct_test::entity& receiver() {
467         return **(m_entities.end() - 1);
468     }
469 
470     std::vector<void*> m_uct_descs;
471 
472     static bool is_am_received;
473 };
474 
475 bool test_tag::is_am_received = false;
476 
477 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_short_expected,
478                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_SHORT))
479 {
480     test_tag_expected(static_cast<send_func>(&test_tag::tag_eager_short),
481                       sender().iface_attr().cap.tag.eager.max_short);
482 }
483 
484 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_expected,
485                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
486 {
487     test_tag_expected(static_cast<send_func>(&test_tag::tag_eager_bcopy),
488                       sender().iface_attr().cap.tag.eager.max_bcopy);
489 }
490 
491 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_expected,
492                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
493 {
494     test_tag_expected(static_cast<send_func>(&test_tag::tag_eager_zcopy),
495                       sender().iface_attr().cap.tag.eager.max_zcopy);
496 }
497 
498 UCS_TEST_SKIP_COND_P(test_tag, tag_rndv_zcopy_expected,
499                      !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
500 {
501     test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_zcopy),
502                       sender().iface_attr().cap.tag.rndv.max_zcopy);
503 }
504 
505 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_unexpected,
506                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
507 {
508     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_bcopy),
509                         sender().iface_attr().cap.tag.eager.max_bcopy);
510 }
511 
512 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_unexpected,
513                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
514 {
515     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_zcopy),
516                         sender().iface_attr().cap.tag.eager.max_bcopy);
517 }
518 
519 UCS_TEST_SKIP_COND_P(test_tag, tag_rndv_zcopy_unexpected,
520                      !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
521 {
522     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_zcopy));
523 }
524 
525 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_wrong_tag,
526                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
527 {
528     test_tag_wrong_tag(static_cast<send_func>(&test_tag::tag_eager_bcopy));
529 }
530 
531 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_wrong_tag,
532                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
533 {
534     test_tag_wrong_tag(static_cast<send_func>(&test_tag::tag_eager_zcopy));
535 }
536 
537 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_short_tag_mask,
538                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_SHORT))
539 {
540     test_tag_mask(static_cast<send_func>(&test_tag::tag_eager_short));
541 }
542 
543 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_bcopy_tag_mask,
544                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
545 {
546     test_tag_mask(static_cast<send_func>(&test_tag::tag_eager_bcopy));
547 }
548 
549 UCS_TEST_SKIP_COND_P(test_tag, tag_eager_zcopy_tag_mask,
550                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
551 {
552     test_tag_mask(static_cast<send_func>(&test_tag::tag_eager_zcopy));
553 }
554 
555 UCS_TEST_SKIP_COND_P(test_tag, tag_rndv_zcopy_tag_mask,
556                      !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
557 {
558     test_tag_mask(static_cast<send_func>(&test_tag::tag_rndv_zcopy));
559 }
560 
561 UCS_TEST_SKIP_COND_P(test_tag, tag_hold_uct_desc,
562                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
563                                  UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
564 {
565     int n = 10;
566     int msg_size = ucs_min(sender().iface_attr().cap.tag.eager.max_bcopy,
567                            sender().iface_attr().cap.tag.rndv.max_zcopy);
568     for (int i = 0; i < n; ++i) {
569         test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_bcopy),
570                             msg_size, true);
571 
572         test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_zcopy),
573                             msg_size, true);
574     }
575 
576     for (ucs::ptr_vector<void>::const_iterator iter = m_uct_descs.begin();
577          iter != m_uct_descs.end(); ++iter)
578     {
579         uct_iface_release_desc(*iter);
580     }
581 }
582 
583 
584 UCS_TEST_SKIP_COND_P(test_tag, tag_send_no_tag,
585                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
586 {
587     uct_iface_set_am_handler(receiver().iface(), 0, am_handler, NULL, 0);
588     mapped_buffer lbuf(200, SEND_SEED, sender());
589     ssize_t len = uct_ep_am_bcopy(sender().ep(0), 0, mapped_buffer::pack,
590                                   reinterpret_cast<void*>(&lbuf), 0);
591     EXPECT_EQ(lbuf.length(), static_cast<size_t>(len));
592     wait_for_flag(&is_am_received);
593     EXPECT_TRUE(is_am_received);
594 }
595 
596 UCS_TEST_SKIP_COND_P(test_tag, tag_cancel_force,
597                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
598 {
599     const size_t length = 128;
600     mapped_buffer recvbuf(length, RECV_SEED, receiver());
601     recv_ctx r_ctx;
602     init_recv_ctx(r_ctx, &recvbuf, 1);
603 
604     ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
605     short_progress_loop(200);
606     ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
607 
608     short_progress_loop();
609 
610     mapped_buffer sendbuf(length, SEND_SEED, sender());
611     send_ctx s_ctx;
612     init_send_ctx(s_ctx, &sendbuf, 1, reinterpret_cast<uint64_t>(&r_ctx));
613     ASSERT_UCS_OK(tag_eager_bcopy(sender(), s_ctx));
614 
615     // Message should arrive unexpected, since tag was cancelled
616     // on the receiver.
617     wait_for_flag(&r_ctx.unexp);
618     check_rx_completion(r_ctx, false, SEND_SEED);
619 }
620 
621 UCS_TEST_SKIP_COND_P(test_tag, tag_cancel_noforce,
622                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
623 {
624     const size_t length = 128;
625     mapped_buffer recvbuf(length, RECV_SEED, receiver());
626     recv_ctx r_ctx;
627     init_recv_ctx(r_ctx, &recvbuf, 1);
628 
629     ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
630     short_progress_loop(200);
631     ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 0));
632 
633     wait_for_flag(&r_ctx.comp);
634 
635     // Check that completed callback has been called with CANCELED status
636     // (because 0 was passed as force parameter to cancel).
637     EXPECT_TRUE(r_ctx.comp);
638     EXPECT_EQ(r_ctx.status, UCS_ERR_CANCELED);
639 }
640 
641 UCS_TEST_SKIP_COND_P(test_tag, tag_limit,
642                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
643 {
644     const size_t length = 32;
645     ucs::ptr_vector<recv_ctx> rctxs;
646     ucs::ptr_vector<mapped_buffer> rbufs;
647     ucs_status_t status;
648 
649     do {
650         recv_ctx *rctx_p     = new recv_ctx();
651         mapped_buffer *buf_p = new mapped_buffer(length, RECV_SEED, receiver());
652         init_recv_ctx(*rctx_p, buf_p, 1);
653         rctxs.push_back(rctx_p);
654         rbufs.push_back(buf_p);
655         status = tag_post(receiver(), *rctx_p);
656         // Make sure send resources are acknowledged, as we
657         // awaiting for tag space exhaustion.
658         short_progress_loop();
659     } while (status == UCS_OK);
660 
661     EXPECT_EQ(status, UCS_ERR_EXCEEDS_LIMIT);
662 
663     // Cancel one of the postings
664     ASSERT_UCS_OK(tag_cancel(receiver(), rctxs.at(0), 1));
665     short_progress_loop();
666 
667     // Check we can post again within a reasonable time
668     ucs_time_t deadline = ucs_get_time() + ucs_time_from_sec(20.0);
669     do {
670         status = tag_post(receiver(), rctxs.at(0));
671     } while ((ucs_get_time() < deadline) && (status == UCS_ERR_EXCEEDS_LIMIT));
672     ASSERT_UCS_OK(status);
673 
674     // remove posted tags from HW
675     for (ucs::ptr_vector<recv_ctx>::const_iterator iter = rctxs.begin();
676          iter != rctxs.end() - 1; ++iter) {
677         ASSERT_UCS_OK(tag_cancel(receiver(), **iter, 1));
678     }
679 }
680 
681 UCS_TEST_SKIP_COND_P(test_tag, tag_post_same,
682                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
683 {
684     const size_t length = 128;
685     mapped_buffer recvbuf(length, RECV_SEED, receiver());
686     recv_ctx r_ctx;
687     init_recv_ctx(r_ctx, &recvbuf, 1);
688 
689     ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
690 
691     // Can't post the same buffer until it is completed/cancelled
692     ucs_status_t status = tag_post(receiver(), r_ctx);
693     EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS);
694 
695     // Cancel with force, should be able to re-post immediately
696     ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
697     ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
698 
699     // Cancel without force, should be able to re-post when receive completion
700     ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 0));
701     status = tag_post(receiver(), r_ctx);
702     EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS); // no completion yet
703 
704     wait_for_flag(&r_ctx.comp); // cancel completed, should be able to post
705     ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
706 
707     // Now send something to trigger rx completion
708     init_recv_ctx(r_ctx, &recvbuf, 1); // reinit rx to clear completed states
709     mapped_buffer sendbuf(length, SEND_SEED, sender());
710     send_ctx s_ctx;
711     init_send_ctx(s_ctx, &sendbuf, 1, reinterpret_cast<uint64_t>(&r_ctx));
712     ASSERT_UCS_OK(tag_eager_bcopy(sender(), s_ctx));
713 
714     wait_for_flag(&r_ctx.comp); // message consumed, should be able to post
715     ASSERT_UCS_OK(tag_post(receiver(), r_ctx));
716 
717     ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1));
718 }
719 
720 UCS_TEST_SKIP_COND_P(test_tag, sw_rndv_expected,
721                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
722                                  UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
723 {
724     test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_request),
725                       sender().iface_attr().cap.tag.rndv.max_hdr, true);
726 }
727 
728 UCS_TEST_SKIP_COND_P(test_tag, rndv_limit,
729                      !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
730 {
731     mapped_buffer sendbuf(8, SEND_SEED, sender());
732     ucs::ptr_vector<send_ctx> sctxs;
733     ucs_status_t status;
734     send_ctx *sctx_p;
735     void *op;
736 
737     do {
738         sctx_p = new send_ctx;
739         init_send_ctx(*sctx_p, &sendbuf, 0xffff, 0);
740         status = tag_rndv_zcopy(sender(), *sctx_p);
741         sctxs.push_back(sctx_p);
742     } while (status == UCS_OK);
743 
744     EXPECT_EQ(status, UCS_ERR_NO_RESOURCE);
745 
746     for (ucs::ptr_vector<send_ctx>::const_iterator iter = sctxs.begin();
747          iter != sctxs.end(); ++iter)
748     {
749         op = (*iter)->rndv_op;
750         if (!UCS_PTR_IS_ERR(op)) {
751             tag_rndv_cancel(sender(), op);
752         }
753     }
754 
755     ucs_log_push_handler(log_ep_destroy);
756     sender().destroy_eps();
757     ucs_log_pop_handler();
758 }
759 
760 UCS_TEST_SKIP_COND_P(test_tag, sw_rndv_unexpected,
761                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
762                                  UCT_IFACE_FLAG_TAG_RNDV_ZCOPY))
763 {
764     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_request));
765 }
766 
UCT_TAG_INSTANTIATE_TEST_CASE(test_tag)767 UCT_TAG_INSTANTIATE_TEST_CASE(test_tag)
768 
769 
770 #if defined (ENABLE_STATS) && IBV_HW_TM
771 extern "C" {
772 #include <uct/api/uct.h>
773 #include <uct/ib/rc/accel/rc_mlx5_common.h>
774 #include <uct/ib/base/ib_verbs.h>
775 }
776 
777 class test_tag_stats : public test_tag {
778 public:
init()779     void init() {
780         stats_activate();
781         test_tag::init();
782     }
783 
cleanup()784     void cleanup() {
785         test_tag::cleanup();
786         stats_restore();
787     }
788 
ep_stats(const entity & e)789     ucs_stats_node_t *ep_stats(const entity &e)
790     {
791         return ucs_derived_of(e.ep(0), uct_base_ep_t)->stats;
792     }
793 
iface_stats(const entity & e)794     ucs_stats_node_t *iface_stats(const entity &e)
795     {
796         return ucs_derived_of(e.iface(), uct_rc_mlx5_iface_common_t)->tm.stats;
797     }
798 
provoke_sync(const entity & e)799     void provoke_sync(const entity &e)
800     {
801         uct_rc_mlx5_iface_common_t *iface;
802 
803         iface = ucs_derived_of(e.iface(), uct_rc_mlx5_iface_common_t);
804 
805         // Counters are synced every IBV_DEVICE_MAX_UNEXP_COUNT ops, set
806         // it one op before, so that any following unexpected message would
807         // cause HW ans SW counters sync.
808         iface->tm.unexpected_cnt = IBV_DEVICE_MAX_UNEXP_COUNT - 1;
809     }
810 
check_tx_counters(int op,uint64_t op_val,int type,size_t len)811     void check_tx_counters(int op, uint64_t op_val, int type, size_t len)
812     {
813         uint64_t v;
814 
815         v = UCS_STATS_GET_COUNTER(ep_stats(sender()), op);
816         EXPECT_EQ(op_val, v);
817 
818         // With valgrind reduced messages is sent
819         if (!RUNNING_ON_VALGRIND) {
820             v = UCS_STATS_GET_COUNTER(ep_stats(sender()), type);
821             EXPECT_EQ(len, v);
822         }
823     }
824 
check_rx_counter(int op,uint64_t val,entity & e)825     void check_rx_counter(int op, uint64_t val, entity &e)
826     {
827         EXPECT_EQ(val, UCS_STATS_GET_COUNTER(iface_stats(e), op));
828     }
829 };
830 
831 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_expected_eager,
832                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_SHORT |
833                                  UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
834                                  UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
835 {
836     std::pair<send_func, std::pair<size_t, int> > sfuncs[3] = {
837                 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_short),
838                 std::make_pair(sender().iface_attr().cap.tag.eager.max_short,
839                 static_cast<int>(UCT_EP_STAT_BYTES_SHORT))),
840 
841                 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_bcopy),
842                 std::make_pair(sender().iface_attr().cap.tag.eager.max_bcopy,
843                 static_cast<int>(UCT_EP_STAT_BYTES_BCOPY))),
844 
845                 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_zcopy),
846                 std::make_pair(sender().iface_attr().cap.tag.eager.max_zcopy,
847                 static_cast<int>(UCT_EP_STAT_BYTES_ZCOPY)))
848     };
849 
850     for (int i = 0; i < 3; ++i) {
851         test_tag_expected(sfuncs[i].first, sfuncs[i].second.first);
852         check_tx_counters(UCT_EP_STAT_TAG, i + 1,
853                           sfuncs[i].second.second,
854                           sfuncs[i].second.first);
855         check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_EXP, i + 1, receiver());
856     }
857 }
858 
859 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_unexpected_eager,
860                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
861                                  UCT_IFACE_FLAG_TAG_EAGER_ZCOPY))
862 {
863     std::pair<send_func, std::pair<size_t, int> > sfuncs[2] = {
864                 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_bcopy),
865                 std::make_pair(sender().iface_attr().cap.tag.eager.max_bcopy,
866                 static_cast<int>(UCT_EP_STAT_BYTES_BCOPY))),
867 
868                 std::make_pair(static_cast<send_func>(&test_tag::tag_eager_zcopy),
869                 std::make_pair(sender().iface_attr().cap.tag.eager.max_zcopy,
870                 static_cast<int>(UCT_EP_STAT_BYTES_ZCOPY)))
871     };
872 
873     for (int i = 0; i < 2; ++i) {
874         test_tag_unexpected(sfuncs[i].first, sfuncs[i].second.first);
875         check_tx_counters(UCT_EP_STAT_TAG, i + 1,
876                           sfuncs[i].second.second,
877                           sfuncs[i].second.first);
878         check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_EAGER_UNEXP, i + 1, receiver());
879     }
880 }
881 
882 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_list_ops,
883                      !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
884 {
885     mapped_buffer recvbuf(32, RECV_SEED, receiver());
886     recv_ctx rctx;
887 
888     init_recv_ctx(rctx, &recvbuf, 1);
889 
890     ASSERT_UCS_OK(tag_post(receiver(), rctx));
891     check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_ADD, 1ul, receiver());
892 
893     ASSERT_UCS_OK(tag_cancel(receiver(), rctx, 1));
894     check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_DEL, 1ul, receiver());
895 
896     // Every ADD and DEL is paired with SYNC, but stats counter is increased
897     // when separate SYNC op is issued only. So, we expect it to be 0 after
898     // ADD and DEL operations.
899     check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_SYNC, 0ul, receiver());
900 
901     // Provoke real SYNC op and send a message unexpectedly
902     provoke_sync(receiver());
903     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_eager_bcopy));
904     check_rx_counter(UCT_RC_MLX5_STAT_TAG_LIST_SYNC, 1ul, receiver());
905 }
906 
907 
908 UCS_TEST_SKIP_COND_P(test_tag_stats, tag_rndv,
909                      !check_caps(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY |
910                                  UCT_IFACE_FLAG_TAG_EAGER_BCOPY))
911 {
912     size_t len = sender().iface_attr().cap.tag.rndv.max_zcopy / 8;
913 
914     // Check UNEXP_RNDV on the receiver
915     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_zcopy), len);
916     check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_UNEXP, 1ul, receiver());
917 
918     // Check that sender receives RNDV_FIN in case of expected rndv message
919     test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_zcopy), len);
920     check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_FIN, 1ul, sender());
921 
922 
923     // Check UNEXP_RNDV_REQ on the receiver
924     test_tag_unexpected(static_cast<send_func>(&test_tag::tag_rndv_request));
925     check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_REQ_UNEXP, 1ul, receiver());
926 
927     // Check NEXP_RNDV_REQ on the receiver
928     test_tag_expected(static_cast<send_func>(&test_tag::tag_rndv_request),
929                      sender().iface_attr().cap.tag.rndv.max_hdr, true);
930     check_rx_counter(UCT_RC_MLX5_STAT_TAG_RX_RNDV_REQ_EXP, 1ul, receiver());
931 }
932 
UCT_TAG_INSTANTIATE_TEST_CASE(test_tag_stats)933 UCT_TAG_INSTANTIATE_TEST_CASE(test_tag_stats)
934 
935 #endif
936 
937 
938 #if IBV_HW_TM
939 
940 extern "C" {
941 #include <uct/ib/rc/accel/rc_mlx5_common.h>
942 }
943 
944 // TODO: Unite with test_tag + add GRH testing for DC
945 class test_tag_mp_xrq : public uct_test {
946 public:
947     static const uint64_t SEND_SEED = 0xa1a1a1a1a1a1a1a1ul;
948     static const uint64_t AM_ID     = 1;
949     typedef void (test_tag_mp_xrq::*send_func)(mapped_buffer*);
950 
951     virtual void init();
952     test_tag_mp_xrq();
953     uct_rc_mlx5_iface_common_t* rc_mlx5_iface(entity &e);
954     void send_eager_bcopy(mapped_buffer *buf);
955     void send_eager_zcopy(mapped_buffer *buf);
956     void send_rndv_zcopy(mapped_buffer *buf);
957     void send_rndv_request(mapped_buffer *buf);
958     void send_am_bcopy(mapped_buffer *buf);
959     void test_common(send_func sfunc, size_t num_segs, size_t exp_segs = 1,
960                      bool is_eager = true);
961 
962     static ucs_status_t am_handler(void *arg, void *data, size_t length,
963                                    unsigned flags);
964 
965     static ucs_status_t unexp_eager(void *arg, void *data, size_t length,
966                                     unsigned flags, uct_tag_t stag,
967                                     uint64_t imm, void **context);
968 
969     static ucs_status_t unexp_rndv(void *arg, unsigned flags, uint64_t stag,
970                                    const void *header, unsigned header_length,
971                                    uint64_t remote_addr, size_t length,
972                                    const void *rkey_buf);
973 
974 protected:
975     static size_t      m_rx_counter;
976     std::vector<void*> m_uct_descs;
977     bool               m_hold_uct_desc;
978 
sender()979     uct_test::entity& sender() {
980         return **m_entities.begin();
981     }
982 
receiver()983     uct_test::entity& receiver() {
984         return **(m_entities.end() - 1);
985     }
986 
987 private:
988     ucs_status_t unexp_handler(void *data, unsigned flags, uint64_t imm,
989                                void **context);
990     ucs_status_t handle_uct_desc(void *data, unsigned flags);
991     void set_env_var_or_skip(void *config, const char *var, const char *val);
992     size_t           m_max_hdr;
993     bool             m_first_received;
994     bool             m_last_received;
995     uct_completion_t m_uct_comp;
996 };
997 
998 size_t test_tag_mp_xrq::m_rx_counter = 0;
999 
test_tag_mp_xrq()1000 test_tag_mp_xrq::test_tag_mp_xrq() : m_hold_uct_desc(false),
1001                                      m_first_received(false),
1002                                      m_last_received(false)
1003 {
1004     m_max_hdr        = sizeof(ibv_tmh) + sizeof(ibv_rvh);
1005     m_uct_comp.count = 512; // We do not need completion func to be invoked
1006     m_uct_comp.func  = NULL;
1007 }
1008 
rc_mlx5_iface(entity & e)1009 uct_rc_mlx5_iface_common_t* test_tag_mp_xrq::rc_mlx5_iface(entity &e)
1010 {
1011     return ucs_derived_of(e.iface(), uct_rc_mlx5_iface_common_t);
1012 }
1013 
set_env_var_or_skip(void * config,const char * var,const char * val)1014 void test_tag_mp_xrq::set_env_var_or_skip(void *config, const char *var,
1015                                           const char *val)
1016 {
1017     ucs_status_t status = uct_config_modify(config, var, val);
1018     if (status != UCS_OK) {
1019         ucs_warn("%s", ucs_status_string(status));
1020         UCS_TEST_SKIP_R(std::string("Can't set ") + var);
1021     }
1022 }
1023 
init()1024 void test_tag_mp_xrq::init()
1025 {
1026     set_env_var_or_skip(m_iface_config, "RC_TM_ENABLE", "y");
1027     set_env_var_or_skip(m_iface_config, "RC_TM_MP_SRQ_ENABLE", "try");
1028     set_env_var_or_skip(m_iface_config, "RC_TM_MP_NUM_STRIDES", "8");
1029     set_env_var_or_skip(m_md_config, "MLX5_DEVX_OBJECTS", "dct,dcsrq,rcsrq,rcqp");
1030 
1031     uct_test::init();
1032 
1033     entity *sender = uct_test::create_entity(0ul, NULL, unexp_eager, unexp_rndv,
1034                                              reinterpret_cast<void*>(this),
1035                                              reinterpret_cast<void*>(this));
1036     m_entities.push_back(sender);
1037 
1038     entity *receiver = uct_test::create_entity(0ul, NULL, unexp_eager, unexp_rndv,
1039                                                reinterpret_cast<void*>(this),
1040                                                reinterpret_cast<void*>(this));
1041     m_entities.push_back(receiver);
1042 
1043     if (!UCT_RC_MLX5_MP_ENABLED(rc_mlx5_iface(test_tag_mp_xrq::sender()))) {
1044         UCS_TEST_SKIP_R("No MP XRQ support");
1045     }
1046 
1047     sender->connect(0, *receiver, 0);
1048 
1049     uct_iface_set_am_handler(receiver->iface(), AM_ID, am_handler, this, 0);
1050 }
1051 
send_eager_bcopy(mapped_buffer * buf)1052 void test_tag_mp_xrq::send_eager_bcopy(mapped_buffer *buf)
1053 {
1054     ssize_t len = uct_ep_tag_eager_bcopy(sender().ep(0), 0x11,
1055                                          reinterpret_cast<uint64_t>(this),
1056                                          mapped_buffer::pack,
1057                                          reinterpret_cast<void*>(buf), 0);
1058 
1059     EXPECT_EQ(buf->length(), static_cast<size_t>(len));
1060 }
1061 
send_eager_zcopy(mapped_buffer * buf)1062 void test_tag_mp_xrq::send_eager_zcopy(mapped_buffer *buf)
1063 {
1064     UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buf->ptr(), buf->length(), buf->memh(),
1065                             sender().iface_attr().cap.tag.eager.max_iov);
1066 
1067     ucs_status_t status = uct_ep_tag_eager_zcopy(sender().ep(0), 0x11,
1068                                                  reinterpret_cast<uint64_t>(this),
1069                                                  iov, iovcnt, 0, &m_uct_comp);
1070     ASSERT_UCS_OK_OR_INPROGRESS(status);
1071 }
1072 
send_rndv_zcopy(mapped_buffer * buf)1073 void test_tag_mp_xrq::send_rndv_zcopy(mapped_buffer *buf)
1074 {
1075     UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, buf->ptr(), buf->length(), buf->memh(),
1076                             sender().iface_attr().cap.tag.rndv.max_iov);
1077 
1078     uint64_t dummy_hdr       = 0xFAFA;
1079     ucs_status_ptr_t rndv_op = uct_ep_tag_rndv_zcopy(sender().ep(0), 0x11, &dummy_hdr,
1080                                                      sizeof(dummy_hdr), iov,
1081                                                      iovcnt, 0, &m_uct_comp);
1082     ASSERT_FALSE(UCS_PTR_IS_ERR(rndv_op));
1083 
1084     // There will be no real RNDV performed, cancel the op to avoid mpool
1085     // warning on exit
1086     ASSERT_UCS_OK(uct_ep_tag_rndv_cancel(sender().ep(0),rndv_op));
1087 }
1088 
send_rndv_request(mapped_buffer * buf)1089 void test_tag_mp_xrq::send_rndv_request(mapped_buffer *buf)
1090 {
1091     size_t size = sender().iface_attr().cap.tag.rndv.max_hdr;
1092     void *hdr   = alloca(size);
1093 
1094     ASSERT_UCS_OK(uct_ep_tag_rndv_request(sender().ep(0), 0x11, hdr, size, 0));
1095 }
1096 
send_am_bcopy(mapped_buffer * buf)1097 void test_tag_mp_xrq::send_am_bcopy(mapped_buffer *buf)
1098 {
1099     ssize_t len = uct_ep_am_bcopy(sender().ep(0), AM_ID, mapped_buffer::pack,
1100                                   reinterpret_cast<void*>(buf), 0);
1101 
1102     EXPECT_EQ(buf->length(), static_cast<size_t>(len));
1103 }
1104 
test_common(send_func sfunc,size_t num_segs,size_t exp_segs,bool is_eager)1105 void test_tag_mp_xrq::test_common(send_func sfunc, size_t num_segs,
1106                                   size_t exp_segs, bool is_eager)
1107 {
1108     size_t seg_size  = rc_mlx5_iface(sender())->super.super.config.seg_size;
1109     size_t seg_num   = is_eager ? num_segs : 1;
1110     size_t exp_val   = is_eager ? exp_segs : 1;
1111     size_t size      = (seg_size * seg_num) - m_max_hdr;
1112     m_rx_counter     = 0;
1113     m_first_received = m_last_received = false;
1114 
1115     EXPECT_TRUE(size <= sender().iface_attr().cap.tag.eager.max_bcopy);
1116     mapped_buffer buf(size, SEND_SEED, sender());
1117 
1118     (this->*sfunc)(&buf);
1119 
1120     wait_for_value(&m_rx_counter, exp_val, true);
1121     EXPECT_EQ(exp_val, m_rx_counter);
1122     EXPECT_EQ(is_eager, m_first_received); // relevant for eager only
1123     EXPECT_EQ(is_eager, m_last_received);  // relevant for eager only
1124 }
1125 
handle_uct_desc(void * data,unsigned flags)1126 ucs_status_t test_tag_mp_xrq::handle_uct_desc(void *data, unsigned flags)
1127 {
1128     if ((flags & UCT_CB_PARAM_FLAG_DESC) && m_hold_uct_desc) {
1129         m_uct_descs.push_back(data);
1130         return UCS_INPROGRESS;
1131     }
1132 
1133     return UCS_OK;
1134 }
1135 
am_handler(void * arg,void * data,size_t length,unsigned flags)1136 ucs_status_t test_tag_mp_xrq::am_handler(void *arg, void *data, size_t length,
1137                                          unsigned flags)
1138 {
1139    EXPECT_TRUE(flags & UCT_CB_PARAM_FLAG_FIRST);
1140    EXPECT_FALSE(flags & UCT_CB_PARAM_FLAG_MORE);
1141 
1142    m_rx_counter++;
1143 
1144    test_tag_mp_xrq *self = reinterpret_cast<test_tag_mp_xrq*>(arg);
1145    return self->handle_uct_desc(data, flags);
1146 }
1147 
unexp_handler(void * data,unsigned flags,uint64_t imm,void ** context)1148 ucs_status_t test_tag_mp_xrq::unexp_handler(void *data, unsigned flags,
1149                                             uint64_t imm, void **context)
1150 {
1151     void *self = reinterpret_cast<void*>(this);
1152 
1153     if (flags & UCT_CB_PARAM_FLAG_FIRST) {
1154         // Set the message context which will be passed back with the rest of
1155         // message fragments
1156         *context         = self;
1157         m_first_received = true;
1158 
1159     } else {
1160         // Check that the correct message context is passed with all fragments
1161         EXPECT_EQ(self, *context);
1162     }
1163 
1164     if (!(flags & UCT_CB_PARAM_FLAG_MORE)) {
1165         // Last message should contain valid immediate value
1166         EXPECT_EQ(reinterpret_cast<uint64_t>(this), imm);
1167         m_last_received = true;
1168     } else {
1169         // Immediate value is passed with the last message only
1170         EXPECT_EQ(0ul, imm);
1171     }
1172 
1173 
1174     return handle_uct_desc(data, flags);
1175 }
1176 
unexp_eager(void * arg,void * data,size_t length,unsigned flags,uct_tag_t stag,uint64_t imm,void ** context)1177 ucs_status_t test_tag_mp_xrq::unexp_eager(void *arg, void *data, size_t length,
1178                                           unsigned flags, uct_tag_t stag,
1179                                           uint64_t imm, void **context)
1180 {
1181     test_tag_mp_xrq *self = reinterpret_cast<test_tag_mp_xrq*>(arg);
1182 
1183     m_rx_counter++;
1184 
1185     return self->unexp_handler(data, flags, imm, context);
1186 }
1187 
unexp_rndv(void * arg,unsigned flags,uint64_t stag,const void * header,unsigned header_length,uint64_t remote_addr,size_t length,const void * rkey_buf)1188 ucs_status_t test_tag_mp_xrq::unexp_rndv(void *arg, unsigned flags,
1189                                          uint64_t stag, const void *header,
1190                                          unsigned header_length,
1191                                          uint64_t remote_addr, size_t length,
1192                                          const void *rkey_buf)
1193 {
1194     EXPECT_FALSE(flags & UCT_CB_PARAM_FLAG_FIRST);
1195     EXPECT_FALSE(flags & UCT_CB_PARAM_FLAG_MORE);
1196 
1197     m_rx_counter++;
1198 
1199     return UCS_OK;
1200 }
1201 
UCS_TEST_P(test_tag_mp_xrq,config)1202 UCS_TEST_P(test_tag_mp_xrq, config)
1203 {
1204     uct_rc_mlx5_iface_common_t *iface = rc_mlx5_iface(sender());
1205 
1206     // MP XRQ is supported with tag offload only
1207     EXPECT_TRUE(UCT_RC_MLX5_TM_ENABLED(iface));
1208 
1209     // With MP XRQ segment size should be equal to MTU, because HW generates
1210     // CQE per each received MTU
1211     size_t mtu = uct_ib_mtu_value(uct_ib_iface_port_attr(&(iface)->super.super)->active_mtu);
1212     EXPECT_EQ(mtu, iface->super.super.config.seg_size);
1213 
1214     const uct_iface_attr *attrs = &sender().iface_attr();
1215 
1216     // Max tag bcopy is limited by tag tx memory pool
1217     EXPECT_EQ(iface->tm.max_bcopy - sizeof(ibv_tmh),
1218               attrs->cap.tag.eager.max_bcopy);
1219     EXPECT_GT(attrs->cap.tag.eager.max_bcopy,
1220               iface->super.super.config.seg_size);
1221 
1222     // Max tag zcopy is limited by maximal IB message size
1223     EXPECT_EQ(uct_ib_iface_port_attr(&iface->super.super)->max_msg_sz - sizeof(ibv_tmh),
1224               attrs->cap.tag.eager.max_zcopy);
1225 
1226     // Maximal AM size should not exceed segment size, so it would always
1227     // arrive in one-fragment packet (with header it should be strictly less)
1228     EXPECT_LT(attrs->cap.am.max_bcopy, iface->super.super.config.seg_size);
1229     EXPECT_LT(attrs->cap.am.max_zcopy, iface->super.super.config.seg_size);
1230 }
1231 
UCS_TEST_P(test_tag_mp_xrq,desc_release)1232 UCS_TEST_P(test_tag_mp_xrq, desc_release)
1233 {
1234     m_hold_uct_desc = true; // We want to "hold" UCT memory descriptors
1235     std::pair<send_func, bool> sfuncs[5] = {
1236               std::make_pair(&test_tag_mp_xrq::send_eager_bcopy,  true),
1237               std::make_pair(&test_tag_mp_xrq::send_eager_zcopy,  true),
1238               std::make_pair(&test_tag_mp_xrq::send_rndv_zcopy,   false),
1239               std::make_pair(&test_tag_mp_xrq::send_rndv_request, false),
1240               std::make_pair(&test_tag_mp_xrq::send_am_bcopy,     false)
1241     };
1242 
1243     for (int i = 0; i < 5; ++i) {
1244         test_common(sfuncs[i].first, 3, 3, sfuncs[i].second);
1245     }
1246 
1247     for (ucs::ptr_vector<void>::const_iterator iter = m_uct_descs.begin();
1248          iter != m_uct_descs.end(); ++iter)
1249     {
1250         uct_iface_release_desc(*iter);
1251     }
1252 }
1253 
UCS_TEST_P(test_tag_mp_xrq,am)1254 UCS_TEST_P(test_tag_mp_xrq, am)
1255 {
1256     test_common(&test_tag_mp_xrq::send_am_bcopy, 1, 1, false);
1257 }
1258 
UCS_TEST_P(test_tag_mp_xrq,bcopy_eager_only)1259 UCS_TEST_P(test_tag_mp_xrq, bcopy_eager_only)
1260 {
1261     test_common(&test_tag_mp_xrq::send_eager_bcopy, 1);
1262 }
1263 
UCS_TEST_P(test_tag_mp_xrq,zcopy_eager_only)1264 UCS_TEST_P(test_tag_mp_xrq, zcopy_eager_only)
1265 {
1266     test_common(&test_tag_mp_xrq::send_eager_zcopy, 1);
1267 }
1268 
UCS_TEST_P(test_tag_mp_xrq,bcopy_eager)1269 UCS_TEST_P(test_tag_mp_xrq, bcopy_eager)
1270 {
1271     test_common(&test_tag_mp_xrq::send_eager_bcopy, 5, 5);
1272 }
1273 
UCS_TEST_P(test_tag_mp_xrq,zcopy_eager)1274 UCS_TEST_P(test_tag_mp_xrq, zcopy_eager)
1275 {
1276     test_common(&test_tag_mp_xrq::send_eager_zcopy, 5, 5);
1277 }
1278 
UCS_TEST_P(test_tag_mp_xrq,rndv_zcopy)1279 UCS_TEST_P(test_tag_mp_xrq, rndv_zcopy)
1280 {
1281     test_common(&test_tag_mp_xrq::send_rndv_zcopy, 1, 1, false);
1282 }
1283 
UCS_TEST_P(test_tag_mp_xrq,rndv_request)1284 UCS_TEST_P(test_tag_mp_xrq, rndv_request)
1285 {
1286     test_common(&test_tag_mp_xrq::send_rndv_request, 1, 1, false);
1287 }
1288 
1289 UCT_TAG_INSTANTIATE_TEST_CASE(test_tag_mp_xrq)
1290 
1291 #endif
1292