1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7 
8 #include "test_ucp_tag.h"
9 
10 #include "ucp_datatype.h"
11 
12 extern "C" {
13 #include <ucp/core/ucp_resource.h>
14 #include <ucp/core/ucp_ep.inl>
15 #include <ucs/datastruct/queue.h>
16 }
17 
18 #include <iostream>
19 
20 
21 class test_ucp_tag_xfer : public test_ucp_tag {
22 public:
23     enum {
24         VARIANT_DEFAULT,
25         VARIANT_ERR_HANDLING,
26         VARIANT_RNDV_PUT_ZCOPY,
27         VARIANT_RNDV_GET_ZCOPY,
28         VARIANT_RNDV_AUTO,
29         VARIANT_SEND_NBR,
30     };
31 
test_ucp_tag_xfer()32     test_ucp_tag_xfer() {
33         // TODO: test offload and offload MP as different variants
34         enable_tag_mp_offload();
35 
36         if (RUNNING_ON_VALGRIND) {
37             // Alow using TM MP offload for messages with a size of at least
38             // 10000 bytes by setting HW TM segment size to 10 kB, since each
39             // packet in TM MP offload is MTU-size buffer (i.e., in most cases
40             // it is 4 kB segments)
41             m_env.push_back(new ucs::scoped_setenv("UCX_RC_TM_SEG_SIZE", "10k"));
42             m_env.push_back(new ucs::scoped_setenv("UCX_TCP_RX_SEG_SIZE", "8k"));
43         }
44     }
45 
init()46     virtual void init() {
47         if (GetParam().variant == VARIANT_RNDV_PUT_ZCOPY) {
48             modify_config("RNDV_SCHEME", "put_zcopy");
49         } else if (GetParam().variant == VARIANT_RNDV_GET_ZCOPY) {
50             modify_config("RNDV_SCHEME", "get_zcopy");
51         } else if (GetParam().variant == VARIANT_RNDV_AUTO) {
52             modify_config("RNDV_SCHEME", "auto");
53         }
54         modify_config("MAX_EAGER_LANES", "2");
55         modify_config("MAX_RNDV_LANES", "2");
56 
57         test_ucp_tag::init();
58     }
59 
skip_on_ib_dc()60     bool skip_on_ib_dc() {
61 #if HAVE_DC_DV
62         // skip due to DCI stuck bug
63         return has_transport("dc_x");
64 #else
65         return false;
66 #endif
67     }
68 
69     std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)70     static enum_test_params(const ucp_params_t& ctx_params,
71                             const std::string& name,
72                             const std::string& test_case_name,
73                             const std::string& tls)
74     {
75         std::vector<ucp_test_param> result;
76         generate_test_params_variant(ctx_params, name, test_case_name, tls,
77                                      VARIANT_DEFAULT, result);
78         generate_test_params_variant(ctx_params, name,
79                                      test_case_name + "/err_handling_mode_peer",
80                                      tls, VARIANT_ERR_HANDLING, result);
81         generate_test_params_variant(ctx_params, name,
82                                      test_case_name + "/rndv_put_zcopy", tls,
83                                      VARIANT_RNDV_PUT_ZCOPY, result);
84         generate_test_params_variant(ctx_params, name,
85                                      test_case_name + "/rndv_get_zcopy", tls,
86                                      VARIANT_RNDV_GET_ZCOPY, result);
87         generate_test_params_variant(ctx_params, name,
88                                      test_case_name + "/rndv_auto", tls,
89                                      VARIANT_RNDV_AUTO, result);
90         generate_test_params_variant(ctx_params, name,
91                                      test_case_name + "/send_nbr", tls,
92                                      VARIANT_SEND_NBR, result);
93         return result;
94     }
95 
get_ep_params()96     virtual ucp_ep_params_t get_ep_params() {
97         ucp_ep_params_t ep_params = test_ucp_tag::get_ep_params();
98         if (GetParam().variant == VARIANT_ERR_HANDLING) {
99             ep_params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE;
100             ep_params.err_mode    = UCP_ERR_HANDLING_MODE_PEER;
101         }
102         return ep_params;
103     }
104 
is_err_handling() const105     bool is_err_handling() const {
106         return GetParam().variant == VARIANT_ERR_HANDLING;
107     }
108 
skip_err_handling() const109     void skip_err_handling() const {
110         if (is_err_handling()) {
111             UCS_TEST_SKIP_R("err_handling");
112         }
113     }
114 
115     void test_xfer_contig(size_t size, bool expected, bool sync, bool truncated);
116     void test_xfer_generic(size_t size, bool expected, bool sync, bool truncated);
117     void test_xfer_iov(size_t size, bool expected, bool sync, bool truncated);
118     void test_xfer_generic_err(size_t size, bool expected, bool sync, bool truncated);
119 
120 protected:
121     typedef void (test_ucp_tag_xfer::* xfer_func_t)(size_t size, bool expected,
122                                                     bool sync, bool truncated);
123 
124     size_t do_xfer(const void *sendbuf, void *recvbuf, size_t count,
125                    ucp_datatype_t send_dt, ucp_datatype_t recv_dt,
126                    bool expected, bool sync, bool truncated);
127 
128     void test_xfer(xfer_func_t func, bool expected, bool sync, bool truncated);
129     void test_run_xfer(bool send_contig, bool recv_contig,
130                        bool expected, bool sync, bool truncated);
131     void test_xfer_prepare_bufs(uint8_t *sendbuf, uint8_t *recvbuf, size_t count,
132                                 bool send_contig, bool recv_contig,
133                                 ucp_datatype_t *send_dt,
134                                 ucp_datatype_t *recv_dt);
135     void test_xfer_probe(bool send_contig, bool recv_contig,
136                          bool expected, bool sync);
137 
138     void test_xfer_len_offset();
139 
140 private:
141     request* do_send(const void *sendbuf, size_t count, ucp_datatype_t dt, bool sync);
142 
143     static const uint64_t SENDER_TAG = 0x111337;
144     static const uint64_t RECV_MASK  = 0xffff;
145     static const uint64_t RECV_TAG   = 0x1337;
146 
147 };
148 
check_buffers(const std::vector<char> & sendbuf,const std::vector<char> & recvbuf,size_t recvd,size_t send_iovcnt,size_t recv_iovcnt,size_t size,bool expected,bool sync,const std::string datatype)149 int check_buffers(const std::vector<char> &sendbuf, const std::vector<char> &recvbuf,
150                   size_t recvd, size_t send_iovcnt, size_t recv_iovcnt,
151                   size_t size, bool expected, bool sync, const std::string datatype)
152 {
153     int buffers_equal = memcmp(sendbuf.data(), recvbuf.data(), recvd);
154     if (buffers_equal) {
155         std::cout << "\n";
156         ucs::detail::message_stream ms("INFO");
157         for (size_t it = 0; it < recvd; ++it) {
158             if (sendbuf[it] != recvbuf[it]) {
159                 ms << datatype << ':'
160                    << " send_iovcnt=" << std::dec << send_iovcnt
161                    << " recv_iovcnt=" << recv_iovcnt << " size=" << size
162                    << " expected=" << expected << " sync=" << sync
163                    << " Sendbuf[" << std::dec << it << "]=0x"
164                    << std::hex << (static_cast<int>(sendbuf[it]) & 0xff) << ','
165                    << " Recvbuf[" << std::dec << it << "]=0x"
166                    << std::hex << (static_cast<int>(recvbuf[it]) & 0xff) << std::endl;
167                 break;
168             }
169         }
170     }
171     return buffers_equal;
172 }
173 
test_xfer(xfer_func_t func,bool expected,bool sync,bool truncated)174 void test_ucp_tag_xfer::test_xfer(xfer_func_t func, bool expected, bool sync,
175                                   bool truncated)
176 {
177     if (sync) {
178         skip_err_handling();
179     }
180 
181     ucs::detail::message_stream ms("INFO");
182 
183     ms << "0 " << std::flush;
184     (this->*func)(0, expected, sync, false);
185 
186     for (unsigned i = 1; i <= 7; ++i) {
187         size_t max = (long)pow(10.0, i);
188 
189         long count = ucs_max((long)(5000.0 / sqrt(max) / ucs::test_time_multiplier()),
190                              3);
191         if (!expected) {
192             count = ucs_min(count, 50);
193         }
194         ms << count << "x10^" << i << " " << std::flush;
195         for (long j = 0; j < count; ++j) {
196             size_t size = ucs::rand() % max + 1;
197             (this->*func)(size, expected, sync, truncated);
198         }
199     }
200 }
201 
test_xfer_prepare_bufs(uint8_t * sendbuf,uint8_t * recvbuf,size_t count,bool send_contig,bool recv_contig,ucp_datatype_t * send_dt,ucp_datatype_t * recv_dt)202 void test_ucp_tag_xfer::test_xfer_prepare_bufs(uint8_t *sendbuf, uint8_t *recvbuf,
203                                                size_t count, bool send_contig,
204                                                bool recv_contig,
205                                                ucp_datatype_t *send_dt,
206                                                ucp_datatype_t *recv_dt)
207 {
208     ucs_status_t status;
209 
210     if (send_contig) {
211         /* the sender has a contig datatype for the data buffer */
212         for (unsigned i = 0; i < count; ++i) {
213              sendbuf[i] = i % 256;
214         }
215         *send_dt = DATATYPE;
216     } else {
217         /* the sender has a generic datatype */
218         status = ucp_dt_create_generic(&ucp::test_dt_uint8_ops, NULL, send_dt);
219         ASSERT_UCS_OK(status);
220     }
221 
222     if (recv_contig) {
223         /* the recv has a contig datatype for the data buffer */
224         *recv_dt = DATATYPE;
225     } else {
226         /* the receiver has a generic datatype */
227         status = ucp_dt_create_generic(&ucp::test_dt_uint8_ops, NULL, recv_dt);
228         /* the recvbuf can be NULL because we only validate the received data in the
229         * unpack function - we don't copy it to the recvbuf */
230         ASSERT_UCS_OK(status);
231     }
232 }
233 
test_run_xfer(bool send_contig,bool recv_contig,bool expected,bool sync,bool truncated)234 void test_ucp_tag_xfer::test_run_xfer(bool send_contig, bool recv_contig,
235                                       bool expected, bool sync, bool truncated)
236 {
237     static const size_t count = 1148544 / ucs::test_time_multiplier();
238     uint8_t *sendbuf = NULL, *recvbuf = NULL;
239     ucp_datatype_t send_dt, recv_dt;
240     size_t recvd;
241 
242     if (sync) {
243         skip_err_handling();
244     }
245 
246     ucp::dt_gen_start_count  = 0;
247     ucp::dt_gen_finish_count = 0;
248 
249     if (send_contig) {
250         /* the sender has a contig datatype for the data buffer */
251         sendbuf = (uint8_t*) malloc(count * sizeof(*sendbuf));
252     }
253     if (recv_contig) {
254         /* the recv has a contig datatype for the data buffer */
255         recvbuf = (uint8_t*) malloc(count * sizeof(*recvbuf));
256     }
257 
258     test_xfer_prepare_bufs(sendbuf, recvbuf, count, send_contig, recv_contig,
259                            &send_dt, &recv_dt);
260 
261     /* coverity[var_deref_model] */
262     /* coverity[var_deref_op] */
263     recvd = do_xfer(&sendbuf[0], &recvbuf[0], count, send_dt, recv_dt, expected,
264                     sync, truncated);
265     if (!truncated) {
266         EXPECT_EQ(count * sizeof(uint8_t), recvd);
267     }
268 
269     if (send_contig) {
270         free(sendbuf);
271     } else {
272         ucp_dt_destroy(send_dt);
273     }
274 
275     if (recv_contig) {
276         free(recvbuf);
277     } else {
278         ucp_dt_destroy(recv_dt);
279     }
280 }
281 
test_xfer_probe(bool send_contig,bool recv_contig,bool expected,bool sync)282 void test_ucp_tag_xfer::test_xfer_probe(bool send_contig, bool recv_contig,
283                                         bool expected, bool sync)
284 {
285     static const size_t count = 1148544 / ucs::test_time_multiplier();
286     uint8_t             *sendbuf = NULL;
287     uint8_t             *recvbuf = NULL;
288     ucp_datatype_t      send_dt, recv_dt;
289     ucp_tag_message_h   message;
290     ucp_tag_recv_info_t info;
291     request             *rreq, *sreq;
292 
293     /* the self transport doesn't do rndv and completes the send immediately */
294     skip_loopback();
295 
296     ucp::dt_gen_start_count  = 0;
297     ucp::dt_gen_finish_count = 0;
298 
299     sendbuf = (uint8_t*) malloc(count * sizeof(*sendbuf));
300     recvbuf = (uint8_t*) malloc(count * sizeof(*recvbuf));
301 
302     test_xfer_prepare_bufs(sendbuf, recvbuf, count, send_contig, recv_contig,
303                            &send_dt, &recv_dt);
304 
305     info.length = 0;
306     message = ucp_tag_probe_nb(receiver().worker(), 0x1337, 0xffff, 1, &info);
307     EXPECT_TRUE(message == NULL);
308 
309     sreq = send_nb(&sendbuf[0], count, send_dt, SENDER_TAG);
310     EXPECT_TRUE(!UCS_PTR_IS_ERR(sreq));
311     if (sreq != NULL) {
312         EXPECT_FALSE(sreq->completed);
313     }
314 
315     /* put RTS into the unexpected queue */
316     ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(10.0);
317     do {
318         short_progress_loop();
319         message = ucp_tag_probe_nb(receiver().worker(), RECV_TAG, RECV_MASK, 1, &info);
320     } while ((ucs_get_time() < loop_end_limit) && (message == NULL));
321 
322     /* make sure that there was a match (RTS) */
323     EXPECT_TRUE(message != NULL);
324     EXPECT_EQ(count, info.length);
325     EXPECT_EQ((ucp_tag_t)SENDER_TAG, info.sender_tag);
326 
327     /* coverity[var_deref_model] */
328     rreq = (request*) ucp_tag_msg_recv_nb(receiver().worker(), &recvbuf[0],
329                                           count, recv_dt, message, recv_callback);
330     ASSERT_TRUE(!UCS_PTR_IS_ERR(rreq));
331 
332     wait(rreq);
333     if (sreq != NULL) {
334         wait(sreq);
335         request_release(sreq);
336     }
337     request_release(rreq);
338 
339     free(sendbuf);
340     free(recvbuf);
341     if (!send_contig) {
342         ucp_dt_destroy(send_dt);
343     }
344     if (!recv_contig) {
345         ucp_dt_destroy(recv_dt);
346     }
347 }
348 
test_xfer_contig(size_t size,bool expected,bool sync,bool truncated)349 void test_ucp_tag_xfer::test_xfer_contig(size_t size, bool expected, bool sync,
350                                          bool truncated)
351 {
352     std::vector<char> sendbuf(size, 0);
353     std::vector<char> recvbuf(size, 0);
354 
355     ucs::fill_random(sendbuf);
356     size_t recvd = do_xfer(&sendbuf[0], &recvbuf[0], size, DATATYPE, DATATYPE,
357                            expected, sync, truncated);
358     if (!truncated) {
359         ASSERT_EQ(sendbuf.size(), recvd);
360     }
361     EXPECT_TRUE(!check_buffers(sendbuf, recvbuf, recvd, 1, 1,
362                                size, expected, sync, "contig"));
363 }
364 
test_xfer_generic(size_t size,bool expected,bool sync,bool truncated)365 void test_ucp_tag_xfer::test_xfer_generic(size_t size, bool expected, bool sync,
366                                           bool truncated)
367 {
368     size_t count = size / sizeof(uint32_t);
369     ucp_datatype_t dt;
370     ucs_status_t status;
371     size_t recvd;
372 
373     ucp::dt_gen_start_count  = 0;
374     ucp::dt_gen_finish_count = 0;
375 
376     /* if count is zero, truncation has no effect */
377     if ((truncated) && (!count)) {
378         truncated = false;
379     }
380 
381     status = ucp_dt_create_generic(&ucp::test_dt_uint32_ops, NULL, &dt);
382     ASSERT_UCS_OK(status);
383 
384     recvd = do_xfer(NULL, NULL, count, dt, dt, expected, sync, truncated);
385     if (!truncated) {
386         EXPECT_EQ(count * sizeof(uint32_t), recvd);
387     }
388     EXPECT_EQ(2, ucp::dt_gen_start_count);
389     EXPECT_EQ(2, ucp::dt_gen_finish_count);
390 
391     ucp_dt_destroy(dt);
392 }
393 
test_xfer_iov(size_t size,bool expected,bool sync,bool truncated)394 void test_ucp_tag_xfer::test_xfer_iov(size_t size, bool expected, bool sync,
395                                       bool truncated)
396 {
397     const size_t iovcnt = 20;
398     std::vector<char> sendbuf(size, 0);
399     std::vector<char> recvbuf(size, 0);
400 
401     ucs::fill_random(sendbuf);
402 
403     ucp::data_type_desc_t send_dt_desc(DATATYPE_IOV, sendbuf.data(),
404                                        sendbuf.size(), iovcnt);
405     ucp::data_type_desc_t recv_dt_desc(DATATYPE_IOV, recvbuf.data(),
406                                        recvbuf.size(), iovcnt);
407 
408     size_t recvd = do_xfer(send_dt_desc.buf(), recv_dt_desc.buf(), iovcnt,
409                            DATATYPE_IOV, DATATYPE_IOV, expected, sync,
410                            truncated);
411     if (!truncated) {
412         ASSERT_EQ(sendbuf.size(), recvd);
413     }
414     EXPECT_TRUE(!check_buffers(sendbuf, recvbuf, recvd, send_dt_desc.count(),
415                                recv_dt_desc.count(), size, expected, sync,
416                                "IOV"));
417 }
418 
test_xfer_generic_err(size_t size,bool expected,bool sync,bool truncated)419 void test_ucp_tag_xfer::test_xfer_generic_err(size_t size, bool expected,
420                                               bool sync, bool truncated)
421 {
422     size_t count = size / sizeof(uint32_t);
423     ucp_datatype_t dt;
424     ucs_status_t status;
425     request *rreq, *sreq;
426 
427     ucp::dt_gen_start_count  = 0;
428     ucp::dt_gen_finish_count = 0;
429 
430     status = ucp_dt_create_generic(&ucp::test_dt_uint32_err_ops, this, &dt);
431     ASSERT_UCS_OK(status);
432 
433     if (expected) {
434         rreq = recv_nb(NULL, count, dt, RECV_TAG, RECV_MASK);
435         sreq = do_send(NULL, count, dt, sync);
436     } else {
437         sreq = do_send(NULL, count, dt, sync);
438         short_progress_loop();
439         if (sync) {
440             EXPECT_FALSE(sreq->completed);
441         }
442         rreq = recv_nb(NULL, count, dt, RECV_TAG, RECV_MASK);
443     }
444 
445     /* progress both sender and receiver */
446     wait(rreq);
447     if (sreq != NULL) {
448         wait(sreq);
449         request_release(sreq);
450     }
451 
452     /* the generic unpack function is expected to fail */
453     EXPECT_EQ(UCS_ERR_NO_MEMORY, rreq->status);
454     request_release(rreq);
455     EXPECT_EQ(2, ucp::dt_gen_start_count);
456     EXPECT_EQ(2, ucp::dt_gen_finish_count);
457     ucp_dt_destroy(dt);
458 }
459 
460 test_ucp_tag_xfer::request*
do_send(const void * sendbuf,size_t count,ucp_datatype_t dt,bool sync)461 test_ucp_tag_xfer::do_send(const void *sendbuf, size_t count, ucp_datatype_t dt,
462                            bool sync)
463 {
464     if (sync) {
465         return send_sync_nb(sendbuf, count, dt, SENDER_TAG);
466     } else {
467         if (GetParam().variant == VARIANT_SEND_NBR) {
468             return send_nbr(sendbuf, count, dt, SENDER_TAG);
469         }
470         return send_nb(sendbuf, count, dt, SENDER_TAG);
471     }
472 }
473 
do_xfer(const void * sendbuf,void * recvbuf,size_t count,ucp_datatype_t send_dt,ucp_datatype_t recv_dt,bool expected,bool sync,bool truncated)474 size_t test_ucp_tag_xfer::do_xfer(const void *sendbuf, void *recvbuf,
475                                   size_t count, ucp_datatype_t send_dt,
476                                   ucp_datatype_t recv_dt, bool expected,
477                                   bool sync, bool truncated)
478 {
479     request *rreq, *sreq;
480     size_t recvd = 0;
481     size_t recv_count = count;
482 
483     if (truncated) {
484         recv_count /= 2;
485     }
486 
487     if (expected) {
488         rreq = recv_nb(recvbuf, recv_count, recv_dt, RECV_TAG, RECV_MASK);
489         sreq = do_send(sendbuf, count, send_dt, sync);
490     } else {
491         sreq = do_send(sendbuf, count, send_dt, sync);
492 
493         wait_for_unexpected_msg(receiver().worker(), 10.0);
494 
495         if (sync) {
496             EXPECT_FALSE(sreq->completed);
497         }
498         rreq = recv_nb(recvbuf, recv_count, recv_dt, RECV_TAG, RECV_MASK);
499     }
500 
501     /* progress both sender and receiver */
502     wait(rreq);
503     if (sreq != NULL) {
504         wait(sreq);
505         request_release(sreq);
506     }
507 
508     recvd = rreq->info.length;
509     if (!truncated) {
510         EXPECT_UCS_OK(rreq->status);
511         EXPECT_EQ((ucp_tag_t)SENDER_TAG, rreq->info.sender_tag);
512     } else {
513         EXPECT_EQ(UCS_ERR_MESSAGE_TRUNCATED, rreq->status);
514     }
515 
516     request_release(rreq);
517     return recvd;
518 }
519 
test_xfer_len_offset()520 void test_ucp_tag_xfer::test_xfer_len_offset()
521 {
522     const size_t max_offset  = 128;
523     const size_t max_length  = 64 * UCS_KBYTE;
524     const size_t min_length  = UCS_KBYTE;
525     const size_t offset_step = 16;
526     const size_t length_step = 16;
527     const size_t buf_size    = max_length + max_offset + 2;
528     ucp_datatype_t type      = ucp_dt_make_contig(1);
529     void *send_buf           = 0;
530     void *recv_buf           = 0;;
531     size_t offset;
532     size_t length;
533     ucs::detail::message_stream *ms;
534 
535     skip_err_handling();
536 
537     EXPECT_EQ(posix_memalign(&send_buf, 8192, buf_size), 0);
538     EXPECT_EQ(posix_memalign(&recv_buf, 8192, buf_size), 0);
539 
540     memset(send_buf, 0, buf_size);
541     memset(recv_buf, 0, buf_size);
542 
543     for (offset = 0; offset <= max_offset; offset += offset_step) {
544         if (!offset || ucs_is_pow2(offset)) {
545             ms = new ucs::detail::message_stream("INFO");
546             *ms << "offset: " << offset << ": ";
547         } else {
548             ms = NULL;
549         }
550         for (length = min_length; length <= max_length; length += length_step) {
551             if (ms && ucs_is_pow2(length)) {
552                 *ms << length << " ";
553                 fflush(stdout);
554             }
555 
556             do_xfer((char*)send_buf + offset, (char*)recv_buf + offset,
557                     length, type, type, true, true, false);
558             do_xfer((char*)send_buf + max_offset - offset,
559                     (char*)recv_buf + max_offset - offset,
560                     length, type, type, true, true, false);
561         }
562         if (ms) {
563             delete(ms);
564         }
565     }
566 
567     free(recv_buf);
568     free(send_buf);
569 }
570 
UCS_TEST_P(test_ucp_tag_xfer,contig_exp)571 UCS_TEST_P(test_ucp_tag_xfer, contig_exp) {
572     test_xfer(&test_ucp_tag_xfer::test_xfer_contig, true, false, false);
573 }
574 
UCS_TEST_P(test_ucp_tag_xfer,contig_exp_truncated)575 UCS_TEST_P(test_ucp_tag_xfer, contig_exp_truncated) {
576     check_offload_support(false);
577     test_xfer(&test_ucp_tag_xfer::test_xfer_contig, true, false, true);
578 }
579 
UCS_TEST_P(test_ucp_tag_xfer,contig_unexp)580 UCS_TEST_P(test_ucp_tag_xfer, contig_unexp) {
581     test_xfer(&test_ucp_tag_xfer::test_xfer_contig, false, false, false);
582 }
583 
UCS_TEST_P(test_ucp_tag_xfer,generic_exp)584 UCS_TEST_P(test_ucp_tag_xfer, generic_exp) {
585     test_xfer(&test_ucp_tag_xfer::test_xfer_generic, true, false, false);
586 }
587 
UCS_TEST_P(test_ucp_tag_xfer,generic_exp_truncated)588 UCS_TEST_P(test_ucp_tag_xfer, generic_exp_truncated) {
589     test_xfer(&test_ucp_tag_xfer::test_xfer_generic, true, false, true);
590 }
591 
UCS_TEST_P(test_ucp_tag_xfer,generic_unexp)592 UCS_TEST_P(test_ucp_tag_xfer, generic_unexp) {
593     test_xfer(&test_ucp_tag_xfer::test_xfer_generic, false, false, false);
594 }
595 
UCS_TEST_P(test_ucp_tag_xfer,generic_unexp_truncated)596 UCS_TEST_P(test_ucp_tag_xfer, generic_unexp_truncated) {
597     test_xfer(&test_ucp_tag_xfer::test_xfer_generic, false, false, true);
598 }
599 
UCS_TEST_P(test_ucp_tag_xfer,iov_exp)600 UCS_TEST_P(test_ucp_tag_xfer, iov_exp) {
601     test_xfer(&test_ucp_tag_xfer::test_xfer_iov, true, false, false);
602 }
603 
UCS_TEST_P(test_ucp_tag_xfer,iov_exp_truncated)604 UCS_TEST_P(test_ucp_tag_xfer, iov_exp_truncated) {
605     test_xfer(&test_ucp_tag_xfer::test_xfer_iov, true, false, true);
606 }
607 
UCS_TEST_P(test_ucp_tag_xfer,iov_unexp)608 UCS_TEST_P(test_ucp_tag_xfer, iov_unexp) {
609     test_xfer(&test_ucp_tag_xfer::test_xfer_iov, false, false, false);
610 }
611 
UCS_TEST_P(test_ucp_tag_xfer,generic_err_exp)612 UCS_TEST_P(test_ucp_tag_xfer, generic_err_exp) {
613     test_xfer(&test_ucp_tag_xfer::test_xfer_generic_err, true, false, false);
614 }
615 
UCS_TEST_SKIP_COND_P(test_ucp_tag_xfer,generic_err_unexp,skip_on_ib_dc ())616 UCS_TEST_SKIP_COND_P(test_ucp_tag_xfer, generic_err_unexp,
617                      skip_on_ib_dc()) {
618     test_xfer(&test_ucp_tag_xfer::test_xfer_generic_err, false, false, false);
619 }
620 
UCS_TEST_P(test_ucp_tag_xfer,generic_err_exp_sync)621 UCS_TEST_P(test_ucp_tag_xfer, generic_err_exp_sync) {
622     /* because ucp_tag_send_req return status (instead request) if send operation
623      * completed immediately */
624     skip_loopback();
625     test_xfer(&test_ucp_tag_xfer::test_xfer_generic_err, true, true, false);
626 }
627 
UCS_TEST_P(test_ucp_tag_xfer,generic_err_unexp_sync)628 UCS_TEST_P(test_ucp_tag_xfer, generic_err_unexp_sync) {
629     test_xfer(&test_ucp_tag_xfer::test_xfer_generic_err, false, true, false);
630 }
631 
UCS_TEST_P(test_ucp_tag_xfer,contig_exp_sync)632 UCS_TEST_P(test_ucp_tag_xfer, contig_exp_sync) {
633     /* because ucp_tag_send_req return status (instead request) if send operation
634      * completed immediately */
635     skip_loopback();
636     test_xfer(&test_ucp_tag_xfer::test_xfer_contig, true, true, false);
637 }
638 
639 UCS_TEST_P(test_ucp_tag_xfer, contig_exp_sync_zcopy, "ZCOPY_THRESH=1000") {
640     skip_loopback();
641     test_xfer(&test_ucp_tag_xfer::test_xfer_contig, true, true, false);
642 }
643 
UCS_TEST_P(test_ucp_tag_xfer,contig_unexp_sync)644 UCS_TEST_P(test_ucp_tag_xfer, contig_unexp_sync) {
645     test_xfer(&test_ucp_tag_xfer::test_xfer_contig, false, true, false);
646 }
647 
UCS_TEST_P(test_ucp_tag_xfer,generic_exp_sync)648 UCS_TEST_P(test_ucp_tag_xfer, generic_exp_sync) {
649     /* because ucp_tag_send_req return status (instead request) if send operation
650      * completed immediately */
651     skip_loopback();
652     test_xfer(&test_ucp_tag_xfer::test_xfer_generic, true, true, false);
653 }
654 
UCS_TEST_P(test_ucp_tag_xfer,generic_unexp_sync)655 UCS_TEST_P(test_ucp_tag_xfer, generic_unexp_sync) {
656     test_xfer(&test_ucp_tag_xfer::test_xfer_generic, false, true, false);
657 }
658 
UCS_TEST_P(test_ucp_tag_xfer,iov_exp_sync)659 UCS_TEST_P(test_ucp_tag_xfer, iov_exp_sync) {
660     /* because ucp_tag_send_req return status (instead request) if send operation
661      * completed immediately */
662     skip_loopback();
663     test_xfer(&test_ucp_tag_xfer::test_xfer_iov, true, true, false);
664 }
665 
UCS_TEST_P(test_ucp_tag_xfer,iov_unexp_sync)666 UCS_TEST_P(test_ucp_tag_xfer, iov_unexp_sync) {
667     test_xfer(&test_ucp_tag_xfer::test_xfer_iov, false, true, false);
668 }
669 
670 /* send_contig_recv_contig */
671 
672 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp, "RNDV_THRESH=1248576") {
673     test_run_xfer(true, true, true, false, false);
674 }
675 
676 /* send_generic_recv_generic */
677 
678 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_exp, "RNDV_THRESH=1248576") {
679     test_run_xfer(false, false, true, false, false);
680 }
681 
682 /* send_contig_recv_generic */
683 
684 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp, "RNDV_THRESH=1248576") {
685     test_run_xfer(true, false, true, false, false);
686 }
687 
688 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_sync, "RNDV_THRESH=1248576") {
689     test_run_xfer(true, false, false, true, false);
690 }
691 
692 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_sync, "RNDV_THRESH=1248576") {
693     /* because ucp_tag_send_req return status (instead request) if send operation
694      * completed immediately */
695     skip_loopback();
696     test_run_xfer(true, false, true, true, false);
697 }
698 
699 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp, "RNDV_THRESH=1248576") {
700     test_run_xfer(true, false, false, false, false);
701 }
702 
703 /* send_generic_recv_contig */
704 
705 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp, "RNDV_THRESH=1248576") {
706     test_run_xfer(false, true, true, false, false);
707 }
708 
709 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_unexp_sync, "RNDV_THRESH=1248576") {
710     test_run_xfer(false, true, false, true, false);
711 }
712 
713 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp_sync, "RNDV_THRESH=1248576") {
714     /* because ucp_tag_send_req return status (instead request) if send operation
715      * completed immediately */
716     skip_loopback();
717     test_run_xfer(false, true, true, true, false);
718 }
719 
720 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_unexp, "RNDV_THRESH=1248576",
721         "ZCOPY_THRESH=1248576") {
722     test_run_xfer(false, true, false, false, false);
723 }
724 
725 /* rndv send_config_recv_config am_rndv with bcopy on the sender side
726  * (zcopy is tested in the match tests) */
727 
728 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp_rndv, "RNDV_THRESH=1000",
729                                                                 "ZCOPY_THRESH=1248576") {
730     test_run_xfer(true, true, true, false, false);
731 }
732 
733 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp_rndv_truncated, "RNDV_THRESH=1000",
734                                                                           "ZCOPY_THRESH=1248576") {
735     check_offload_support(false);
736     test_run_xfer(true, true, true, false, true);
737 }
738 
739 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp_sync_rndv, "RNDV_THRESH=1000",
740                                                                      "ZCOPY_THRESH=1248576") {
741     /* because ucp_tag_send_req return status (instead request) if send operation
742      * completed immediately */
743     skip_loopback();
744     test_run_xfer(true, true, true, true, false);
745 }
746 
747 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp_sync_rndv_truncated,
748            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
749     /* because ucp_tag_send_req return status (instead request) if send operation
750      * completed immediately */
751     skip_loopback();
752     test_run_xfer(true, true, true, true, true);
753 }
754 
755 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_unexp_rndv, "RNDV_THRESH=1000",
756                                                                   "ZCOPY_THRESH=1248576") {
757     test_run_xfer(true, true, false, false, false);
758 }
759 
760 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_unexp_rndv_truncated, "RNDV_THRESH=1000",
761                                                                             "ZCOPY_THRESH=1248576") {
762     test_run_xfer(true, true, false, false, true);
763 }
764 
765 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_unexp_sync_rndv, "RNDV_THRESH=1000",
766                                                                         "ZCOPY_THRESH=1248576") {
767     test_run_xfer(true, true, false, true, false);
768 }
769 
770 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_unexp_sync_rndv_truncated,
771            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
772     test_run_xfer(true, true, false, true, true);
773 }
774 
775 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_contig_exp_rndv_probe, "RNDV_THRESH=1000",
776                                                                       "ZCOPY_THRESH=1248576") {
777     test_xfer_probe(true, true, true, false);
778 }
779 
780 /* rndv send_generic_recv_generic am_rndv with bcopy on the sender side */
781 
782 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_exp_rndv, "RNDV_THRESH=1000") {
783     test_run_xfer(false, false, true, false, false);
784 }
785 
786 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_exp_rndv_truncated, "RNDV_THRESH=1000") {
787     test_run_xfer(false, false, true, false, true);
788 }
789 
790 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_exp_sync_rndv, "RNDV_THRESH=1000") {
791     /* because ucp_tag_send_req return status (instead request) if send operation
792      * completed immediately */
793     skip_loopback();
794     test_run_xfer(false, false, true, true, false);
795 }
796 
797 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_exp_sync_rndv_truncated,
798            "RNDV_THRESH=1000") {
799     /* because ucp_tag_send_req return status (instead request) if send operation
800      * completed immediately */
801     skip_loopback();
802     test_run_xfer(false, false, true, true, true);
803 }
804 
805 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_unexp_rndv, "RNDV_THRESH=1000") {
806     test_run_xfer(false, false, false, false, false);
807 }
808 
809 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_unexp_rndv_truncated, "RNDV_THRESH=1000") {
810     test_run_xfer(false, false, false, false, true);
811 }
812 
813 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_unexp_sync_rndv, "RNDV_THRESH=1000") {
814     test_run_xfer(false, false, false, true, false);
815 }
816 
817 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_unexp_sync_rndv_truncated,
818            "RNDV_THRESH=1000") {
819     test_run_xfer(false, false, false, true, true);
820 }
821 
822 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_generic_exp_rndv_probe, "RNDV_THRESH=1000") {
823     test_xfer_probe(false, false, true, false);
824 }
825 
826 /* rndv send_generic_recv_contig am_rndv with bcopy on the sender side */
827 
828 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp_rndv, "RNDV_THRESH=1000") {
829     test_run_xfer(false, true, true, false, false);
830 }
831 
832 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp_rndv_truncated, "RNDV_THRESH=1000") {
833     test_run_xfer(false, true, true, false, true);
834 }
835 
836 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp_sync_rndv, "RNDV_THRESH=1000") {
837     /* because ucp_tag_send_req return status (instead request) if send operation
838      * completed immediately */
839     skip_loopback();
840     test_run_xfer(false, true, true, true, false);
841 }
842 
843 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp_sync_rndv_truncated,
844            "RNDV_THRESH=1000") {
845     /* because ucp_tag_send_req return status (instead request) if send operation
846      * completed immediately */
847     skip_loopback();
848     test_run_xfer(false, true, true, true, true);
849 }
850 
851 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_unexp_rndv, "RNDV_THRESH=1000") {
852     test_run_xfer(false, true, false, false, false);
853 }
854 
855 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_unexp_rndv_truncated, "RNDV_THRESH=1000") {
856     test_run_xfer(false, true, false, false, true);
857 }
858 
859 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_unexp_sync_rndv, "RNDV_THRESH=1000") {
860     test_run_xfer(false, true, false, true, false);
861 }
862 
863 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_unexp_sync_rndv_truncated,
864            "RNDV_THRESH=1000") {
865     test_run_xfer(false, true, false, true, true);
866 }
867 
868 UCS_TEST_P(test_ucp_tag_xfer, send_generic_recv_contig_exp_rndv_probe, "RNDV_THRESH=1000") {
869     test_xfer_probe(false, true, true, false);
870 }
871 
872 /* rndv send_contig_recv_generic am_rndv with bcopy on the sender side */
873 
874 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_rndv, "RNDV_THRESH=1000",
875                                                                  "ZCOPY_THRESH=1248576") {
876     test_run_xfer(true, false, true, false, false);
877 }
878 
879 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_rndv_truncated,
880            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
881     test_run_xfer(true, false, true, false, true);
882 }
883 
884 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_sync_rndv,
885            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
886     /* because ucp_tag_send_req return status (instead request) if send operation
887      * completed immediately */
888     skip_loopback();
889     test_run_xfer(true, false, true, true, false);
890 }
891 
892 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_sync_rndv_truncated,
893            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
894     /* because ucp_tag_send_req return status (instead request) if send operation
895      * completed immediately */
896     skip_loopback();
897     test_run_xfer(true, false, true, true, true);
898 }
899 
900 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_rndv,
901            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
902     test_run_xfer(true, false, false, false, false);
903 }
904 
905 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_rndv_truncated,
906            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
907     test_run_xfer(true, false, false, false, true);
908 }
909 
910 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_sync_rndv,
911            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
912     test_run_xfer(true, false, false, true, false);
913 }
914 
915 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_sync_rndv_truncated,
916            "RNDV_THRESH=1000", "ZCOPY_THRESH=1248576") {
917     test_run_xfer(true, false, false, true, true);
918 }
919 
920 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_rndv_probe, "RNDV_THRESH=1000",
921                                                                        "ZCOPY_THRESH=1248576") {
922     test_xfer_probe(true, false, true, false);
923 }
924 
925 /* rndv send_contig_recv_generic am_rndv with zcopy on the sender side */
926 
927 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_rndv_zcopy, "RNDV_THRESH=1000",
928                                                                        "ZCOPY_THRESH=1000") {
929     test_run_xfer(true, false, true, false, false);
930 }
931 
932 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_rndv_truncated_zcopy,
933            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
934     test_run_xfer(true, false, true, false, true);
935 }
936 
937 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_sync_rndv_zcopy,
938            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
939     /* because ucp_tag_send_req return status (instead request) if send operation
940      * completed immediately */
941     skip_loopback();
942     test_run_xfer(true, false, true, true, false);
943 }
944 
945 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_sync_rndv_truncated_zcopy,
946            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
947     /* because ucp_tag_send_req return status (instead request) if send operation
948      * completed immediately */
949     skip_loopback();
950     test_run_xfer(true, false, true, true, true);
951 }
952 
953 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_rndv_zcopy,
954            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
955     test_run_xfer(true, false, false, false, false);
956 }
957 
958 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_rndv_truncated_zcopy,
959            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
960     test_run_xfer(true, false, false, false, true);
961 }
962 
963 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_sync_rndv_zcopy,
964            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
965     test_run_xfer(true, false, false, true, false);
966 }
967 
968 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_unexp_sync_rndv_truncated_zcopy,
969            "RNDV_THRESH=1000", "ZCOPY_THRESH=1000") {
970     test_run_xfer(true, false, false, true, true);
971 }
972 
973 UCS_TEST_P(test_ucp_tag_xfer, send_contig_recv_generic_exp_rndv_probe_zcopy, "RNDV_THRESH=1000",
974                                                                              "ZCOPY_THRESH=1000") {
975     test_xfer_probe(true, false, true, false);
976 }
977 
978 UCS_TEST_SKIP_COND_P(test_ucp_tag_xfer, test_xfer_len_offset,
979                      RUNNING_ON_VALGRIND, "RNDV_THRESH=1000") {
980     test_xfer_len_offset();
981 }
982 
983 UCS_TEST_P(test_ucp_tag_xfer, iov_with_empty_buffers, "ZCOPY_THRESH=512") {
984     const size_t iovcnt    = ucp::data_type_desc_t::MAX_IOV;
985     const size_t size      = UCS_KBYTE;
986     const int    expected  = 1;
987     const int    sync      = 0;
988     const int    truncated = 0;
989 
990     std::vector<char> sendbuf(size, 0);
991     std::vector<char> recvbuf(size, 0);
992     ucp_dt_iov_t iovec[iovcnt];
993 
994     ucs::fill_random(sendbuf);
995 
996     /* initialize iovec with MAX_IOV-1 empty buffers and one non-empty */
997     for (size_t i = 0; i < iovcnt - 1; ++i) {
998         iovec[i].buffer = NULL;
999         iovec[i].length = 0;
1000     }
1001 
1002     /* coverity[escape] */
1003     iovec[iovcnt - 1].buffer = &sendbuf[0];
1004     iovec[iovcnt - 1].length = size;
1005 
1006     ucp::data_type_desc_t recv_dt_desc(DATATYPE_IOV, recvbuf.data(),
1007                                        recvbuf.size(), iovcnt);
1008 
1009     size_t recvd = do_xfer(iovec, recv_dt_desc.buf(), iovcnt,
1010                            DATATYPE_IOV, DATATYPE_IOV, expected, 0,
1011                            truncated);
1012 
1013     ASSERT_EQ(sendbuf.size(), recvd);
1014     EXPECT_TRUE(!check_buffers(sendbuf, recvbuf, recvd, iovcnt,
1015                                recv_dt_desc.count(), size, expected, sync,
1016                                "IOV"));
1017 }
1018 
1019 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_xfer)
1020 
1021 
1022 #ifdef ENABLE_STATS
1023 
1024 class test_ucp_tag_stats : public test_ucp_tag_xfer {
1025 public:
init()1026     void init() {
1027         stats_activate();
1028         test_ucp_tag_xfer::init();
1029     }
1030 
cleanup()1031     void cleanup() {
1032         test_ucp_tag_xfer::cleanup();
1033         stats_restore();
1034     }
1035 
1036     std::vector<ucp_test_param>
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)1037     static enum_test_params(const ucp_params_t& ctx_params,
1038                             const std::string& name,
1039                             const std::string& test_case_name,
1040                             const std::string& tls) {
1041 
1042         return ucp_test::enum_test_params(ctx_params, name,
1043                                           test_case_name, tls);
1044     }
1045 
ep_stats(entity & e)1046     ucs_stats_node_t* ep_stats(entity &e) {
1047         return e.ep()->stats;
1048     }
1049 
worker_stats(entity & e)1050     ucs_stats_node_t* worker_stats(entity &e) {
1051         return e.worker()->stats;
1052     }
1053 
get_rx_stat(unsigned counter)1054     unsigned get_rx_stat(unsigned counter) {
1055         return UCS_STATS_GET_COUNTER(worker_stats(receiver()), counter);
1056     }
1057 
validate_counters(unsigned tx_counter,unsigned rx_counter)1058     void validate_counters(unsigned tx_counter, unsigned rx_counter) {
1059         uint64_t cnt;
1060         cnt = UCS_STATS_GET_COUNTER(ep_stats(sender()), tx_counter);
1061         EXPECT_EQ(1ul, cnt);
1062         cnt = get_rx_stat(rx_counter);
1063         EXPECT_EQ(1ul, cnt);
1064     }
1065 
has_xpmem()1066     bool has_xpmem() {
1067         return ucp_context_find_tl_md(receiver().ucph(), "xpmem") != NULL;
1068     }
1069 
has_get_zcopy()1070     bool has_get_zcopy() {
1071         return has_transport("rc_v") || has_transport("rc_x") ||
1072                has_transport("dc_x") ||
1073                (ucp_context_find_tl_md(receiver().ucph(), "cma")  != NULL) ||
1074                (ucp_context_find_tl_md(receiver().ucph(), "knem") != NULL);
1075     }
1076 
validate_rndv_counters()1077     void validate_rndv_counters() {
1078         unsigned get_zcopy = get_rx_stat(UCP_WORKER_STAT_TAG_RX_RNDV_GET_ZCOPY);
1079         unsigned send_rtr  = get_rx_stat(UCP_WORKER_STAT_TAG_RX_RNDV_SEND_RTR);
1080         unsigned rkey_ptr  = get_rx_stat(UCP_WORKER_STAT_TAG_RX_RNDV_RKEY_PTR);
1081 
1082         UCS_TEST_MESSAGE << "get_zcopy: " << get_zcopy
1083                          << " send_rtr: " << send_rtr
1084                          << " rkey_ptr: " << rkey_ptr;
1085         EXPECT_EQ(1, get_zcopy + send_rtr + rkey_ptr);
1086 
1087         if (has_xpmem()) {
1088             /* rkey_ptr expected to be selected if xpmem is available */
1089             EXPECT_EQ(1u, rkey_ptr);
1090         } else if (has_get_zcopy()) {
1091             /* if any transports supports get_zcopy, expect it to be used */
1092             EXPECT_EQ(1u, get_zcopy);
1093         } else {
1094             /* Could be a transport which supports get_zcopy that wasn't
1095              * accounted for, or fallback to RTR. In any case, rkey_ptr is not
1096              * expected to be used.
1097              */
1098             EXPECT_EQ(1u, send_rtr + get_zcopy);
1099         }
1100     }
1101 
1102 };
1103 
1104 
1105 UCS_TEST_P(test_ucp_tag_stats, eager_expected, "RNDV_THRESH=1248576") {
1106     check_offload_support(false);
1107     test_run_xfer(true, true, true, false, false);
1108     validate_counters(UCP_EP_STAT_TAG_TX_EAGER,
1109                       UCP_WORKER_STAT_TAG_RX_EAGER_MSG);
1110 
1111     uint64_t cnt;
1112     cnt = UCS_STATS_GET_COUNTER(worker_stats(receiver()),
1113                                 UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP);
1114     EXPECT_EQ(cnt, 0ul);
1115 }
1116 
1117 UCS_TEST_P(test_ucp_tag_stats, eager_unexpected, "RNDV_THRESH=1248576") {
1118     check_offload_support(false);
1119     test_run_xfer(true, true, false, false, false);
1120     validate_counters(UCP_EP_STAT_TAG_TX_EAGER,
1121                       UCP_WORKER_STAT_TAG_RX_EAGER_MSG);
1122     uint64_t cnt;
1123     cnt = UCS_STATS_GET_COUNTER(worker_stats(receiver()),
1124                                 UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP);
1125     EXPECT_GT(cnt, 0ul);
1126 }
1127 
1128 UCS_TEST_P(test_ucp_tag_stats, sync_expected, "RNDV_THRESH=1248576") {
1129     check_offload_support(false);
1130     skip_loopback();
1131     test_run_xfer(true, true, true, true, false);
1132     validate_counters(UCP_EP_STAT_TAG_TX_EAGER_SYNC,
1133                       UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG);
1134 
1135     uint64_t cnt;
1136     cnt = UCS_STATS_GET_COUNTER(worker_stats(receiver()),
1137                                  UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP);
1138     EXPECT_EQ(cnt, 0ul);
1139 }
1140 
1141 UCS_TEST_P(test_ucp_tag_stats, sync_unexpected, "RNDV_THRESH=1248576") {
1142     check_offload_support(false);
1143     skip_loopback();
1144     test_run_xfer(true, true, false, true, false);
1145     validate_counters(UCP_EP_STAT_TAG_TX_EAGER_SYNC,
1146                       UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG);
1147     uint64_t cnt;
1148     cnt = UCS_STATS_GET_COUNTER(worker_stats(receiver()),
1149                                 UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP);
1150     EXPECT_GT(cnt, 0ul);
1151 }
1152 
1153 UCS_TEST_P(test_ucp_tag_stats, rndv_expected, "RNDV_THRESH=1000") {
1154     check_offload_support(false);
1155     test_run_xfer(true, true, true, false, false);
1156     validate_counters(UCP_EP_STAT_TAG_TX_RNDV,
1157                       UCP_WORKER_STAT_TAG_RX_RNDV_EXP);
1158     validate_rndv_counters();
1159 }
1160 
1161 UCS_TEST_P(test_ucp_tag_stats, rndv_unexpected, "RNDV_THRESH=1000") {
1162     check_offload_support(false);
1163     test_run_xfer(true, true, false, false, false);
1164     validate_counters(UCP_EP_STAT_TAG_TX_RNDV,
1165                       UCP_WORKER_STAT_TAG_RX_RNDV_UNEXP);
1166     validate_rndv_counters();
1167 }
1168 
1169 UCP_INSTANTIATE_TEST_CASE(test_ucp_tag_stats)
1170 
1171 #endif
1172