1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
3 * Copyright (C) The University of Tennessee and The University
4 *               of Tennessee Research Foundation. 2016. ALL RIGHTS RESERVED.
5 *
6 * See file LICENSE for terms.
7 */
8 
9 #ifdef HAVE_CONFIG_H
10 #  include "config.h"
11 #endif
12 
13 #include <tools/perf/lib/libperf_int.h>
14 
15 extern "C" {
16 #include <ucs/debug/log.h>
17 #include <ucs/sys/preprocessor.h>
18 #include <ucs/sys/math.h>
19 #include <ucs/sys/sys.h>
20 }
21 
22 #include <limits>
23 
24 template <ucx_perf_cmd_t CMD, ucx_perf_test_type_t TYPE, uct_perf_data_layout_t DATA, bool ONESIDED>
25 class uct_perf_test_runner {
26 public:
27 
28     typedef uint8_t psn_t;
29 
uct_perf_test_runner(ucx_perf_context_t & perf)30     uct_perf_test_runner(ucx_perf_context_t &perf) :
31         m_perf(perf),
32         m_max_outstanding(m_perf.params.max_outstanding),
33         m_send_b_count(0)
34 
35     {
36         ucs_assert_always(m_max_outstanding > 0);
37 
38         m_completion.count = 1;
39         m_completion.func  = NULL;
40         m_last_recvd_sn    = 0;
41 
42         ucs_status_t status;
43         uct_iface_attr_t attr;
44         status = uct_iface_query(m_perf.uct.iface, &attr);
45         ucs_assert_always(status == UCS_OK);
46         if (attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT |
47                               UCT_IFACE_FLAG_AM_BCOPY |
48                               UCT_IFACE_FLAG_AM_ZCOPY)) {
49             status = uct_iface_set_am_handler(m_perf.uct.iface,
50                                               UCT_PERF_TEST_AM_ID, am_hander,
51                                               (void*)&m_last_recvd_sn, 0);
52             ucs_assert_always(status == UCS_OK);
53         }
54     }
55 
~uct_perf_test_runner()56     ~uct_perf_test_runner() {
57         uct_iface_set_am_handler(m_perf.uct.iface, UCT_PERF_TEST_AM_ID, NULL,
58                                  NULL, 0);
59     }
60 
61     /**
62      * Make uct_iov_t iov[msg_size_cnt] array with pointer elements to
63      * original buffer
64      */
uct_perf_get_buffer_iov(uct_iov_t * iov,void * buffer,unsigned header_size,uct_mem_h memh,const ucx_perf_context_t * perf)65     static void uct_perf_get_buffer_iov(uct_iov_t *iov, void *buffer,
66                                         unsigned header_size, uct_mem_h memh,
67                                         const ucx_perf_context_t *perf)
68     {
69         const size_t iovcnt    = perf->params.msg_size_cnt;
70         size_t iov_length_it, iov_it;
71 
72         ucs_assert(UCT_PERF_DATA_LAYOUT_ZCOPY == DATA);
73         ucs_assert(NULL != perf->params.msg_size_list);
74         ucs_assert(iovcnt > 0);
75         ucs_assert(perf->params.msg_size_list[0] >= header_size);
76 
77         iov_length_it = 0;
78         for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
79             iov[iov_it].buffer = (char *)buffer + iov_length_it + header_size;
80             iov[iov_it].length = perf->params.msg_size_list[iov_it] - header_size;
81             iov[iov_it].memh   = memh;
82             iov[iov_it].stride = 0;
83             iov[iov_it].count  = 1;
84 
85             if (perf->params.iov_stride) {
86                 iov_length_it += perf->params.iov_stride - header_size;
87             } else {
88                 iov_length_it += iov[iov_it].length;
89             }
90 
91             header_size        = 0; /* should be zero for next iterations */
92         }
93 
94         ucs_debug("IOV buffer filled by %lu slices with total length %lu",
95                   iovcnt, iov_length_it);
96     }
97 
uct_perf_test_prepare_iov_buffer()98     void uct_perf_test_prepare_iov_buffer() {
99         if (UCT_PERF_DATA_LAYOUT_ZCOPY == DATA) {
100             size_t start_iov_buffer_size = 0;
101             if (UCX_PERF_CMD_AM == CMD) {
102                 start_iov_buffer_size = m_perf.params.am_hdr_size;
103             }
104             uct_perf_get_buffer_iov(m_perf.uct.iov, m_perf.send_buffer,
105                                     start_iov_buffer_size,
106                                     m_perf.uct.send_mem.memh,
107                                     &m_perf);
108         }
109     }
110 
111     /**
112      * Get the length between beginning of the IOV first buffer and the latest byte
113      * in the latest IOV buffer.
114      */
uct_perf_get_buffer_extent(const ucx_perf_params_t * params)115     size_t uct_perf_get_buffer_extent(const ucx_perf_params_t *params)
116     {
117         size_t length;
118 
119         if ((UCT_PERF_DATA_LAYOUT_ZCOPY == DATA) && params->iov_stride) {
120             length = ((params->msg_size_cnt - 1) * params->iov_stride) +
121                      params->msg_size_list[params->msg_size_cnt - 1];
122         } else {
123             length = ucx_perf_get_message_size(params);
124         }
125 
126         return length;
127     }
128 
set_sn(void * dst_sn,ucs_memory_type_t dst_mem_type,const void * src_sn) const129     inline void set_sn(void *dst_sn,
130                        ucs_memory_type_t dst_mem_type,
131                        const void *src_sn) const {
132         if (ucs_likely(m_perf.allocator->mem_type == UCS_MEMORY_TYPE_HOST)) {
133             ucs_assert(dst_mem_type == UCS_MEMORY_TYPE_HOST);
134             *reinterpret_cast<psn_t*>(dst_sn) = *reinterpret_cast<const psn_t*>(src_sn);
135         }
136 
137         m_perf.allocator->memcpy(dst_sn, dst_mem_type,
138                                  src_sn, UCS_MEMORY_TYPE_HOST,
139                                  sizeof(psn_t));
140     }
141 
get_sn(const volatile void * sn,ucs_memory_type_t mem_type) const142     inline psn_t get_sn(const volatile void *sn,
143                         ucs_memory_type_t mem_type) const {
144         if (ucs_likely(mem_type == UCS_MEMORY_TYPE_HOST)) {
145             return *reinterpret_cast<const volatile psn_t*>(sn);
146         }
147 
148         psn_t host_sn;
149         m_perf.allocator->memcpy(&host_sn, UCS_MEMORY_TYPE_HOST,
150                                  const_cast<const void*>(sn),
151                                  mem_type, sizeof(psn_t));
152         return host_sn;
153     }
154 
set_recv_sn(void * recv_sn,ucs_memory_type_t recv_mem_type,const void * src_sn) const155     inline void set_recv_sn(void *recv_sn,
156                             ucs_memory_type_t recv_mem_type,
157                             const void *src_sn) const {
158         if (CMD == UCX_PERF_CMD_AM) {
159             ucs_assert(&m_last_recvd_sn == recv_sn);
160             *(psn_t*)recv_sn = *(const psn_t*)src_sn;
161         } else {
162             set_sn(recv_sn, recv_mem_type, src_sn);
163         }
164     }
165 
get_recv_sn(const volatile void * recv_sn,ucs_memory_type_t recv_mem_type) const166     inline psn_t get_recv_sn(const volatile void *recv_sn,
167                              ucs_memory_type_t recv_mem_type) const {
168         if (CMD == UCX_PERF_CMD_AM) {
169             /* it has to be updated after AM completion */
170             ucs_assert(&m_last_recvd_sn == recv_sn);
171             return m_last_recvd_sn;
172         } else {
173             return get_sn(recv_sn, recv_mem_type);
174         }
175     }
176 
progress_responder()177     void UCS_F_ALWAYS_INLINE progress_responder() {
178         if (!ONESIDED) {
179             uct_worker_progress(m_perf.uct.worker);
180         }
181     }
182 
progress_requestor()183     void UCS_F_ALWAYS_INLINE progress_requestor() {
184         uct_worker_progress(m_perf.uct.worker);
185     }
186 
wait_for_window(bool send_window)187     void UCS_F_ALWAYS_INLINE wait_for_window(bool send_window)
188     {
189         while (send_window && (outstanding() >= m_max_outstanding)) {
190             progress_requestor();
191         }
192     }
193 
am_hander(void * arg,void * data,size_t length,unsigned flags)194     static ucs_status_t am_hander(void *arg, void *data, size_t length,
195                                   unsigned flags)
196     {
197         /* we always assume that buffers provided by TLs are host memory */
198         ucs_assert(UCS_CIRCULAR_COMPARE8(*(psn_t*)arg, <=, *(psn_t*)data));
199         *(psn_t*)arg = *(psn_t*)data;
200         return UCS_OK;
201     }
202 
pack_cb(void * dest,void * arg)203     static size_t pack_cb(void *dest, void *arg)
204     {
205         uct_perf_test_runner *self = (uct_perf_test_runner *)arg;
206         size_t length = ucx_perf_get_message_size(&self->m_perf.params);
207 
208         self->m_perf.allocator->memcpy(/* we always assume that buffers
209                                         * provided by TLs are host memory */
210                                        dest, UCS_MEMORY_TYPE_HOST,
211                                        self->m_perf.send_buffer,
212                                        self->m_perf.uct.send_mem.mem_type,
213                                        length);
214 
215         return length;
216     }
217 
unpack_cb(void * arg,const void * data,size_t length)218     static void unpack_cb(void *arg, const void *data, size_t length)
219     {
220         uct_perf_test_runner *self = (uct_perf_test_runner *)arg;
221 
222         self->m_perf.allocator->memcpy(self->m_perf.send_buffer,
223                                        self->m_perf.uct.send_mem.mem_type,
224                                        /* we always assume that buffers
225                                         * provided by TLs are host memory */
226                                        data, UCS_MEMORY_TYPE_HOST,
227                                        length);
228     }
229 
230     ucs_status_t UCS_F_ALWAYS_INLINE
send(uct_ep_h ep,psn_t sn,psn_t prev_sn,void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)231     send(uct_ep_h ep, psn_t sn, psn_t prev_sn, void *buffer, unsigned length,
232          uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp)
233     {
234         uint64_t am_short_hdr;
235         size_t header_size;
236         ssize_t packed_len;
237 
238         /* coverity[switch_selector_expr_is_constant] */
239         switch (CMD) {
240         case UCX_PERF_CMD_AM:
241             /* coverity[switch_selector_expr_is_constant] */
242             switch (DATA) {
243             case UCT_PERF_DATA_LAYOUT_SHORT:
244                 am_short_hdr = sn;
245                 return uct_ep_am_short(ep, UCT_PERF_TEST_AM_ID, am_short_hdr,
246                                        (char*)buffer + sizeof(am_short_hdr),
247                                        length - sizeof(am_short_hdr));
248             case UCT_PERF_DATA_LAYOUT_BCOPY:
249                 set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
250                 packed_len = uct_ep_am_bcopy(ep, UCT_PERF_TEST_AM_ID, pack_cb,
251                                              (void*)this, 0);
252                 return (packed_len >= 0) ? UCS_OK : (ucs_status_t)packed_len;
253             case UCT_PERF_DATA_LAYOUT_ZCOPY:
254                 set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
255                 header_size = m_perf.params.am_hdr_size;
256                 return uct_ep_am_zcopy(ep, UCT_PERF_TEST_AM_ID, buffer, header_size,
257                                        m_perf.uct.iov, m_perf.params.msg_size_cnt,
258                                        0, comp);
259             default:
260                 return UCS_ERR_INVALID_PARAM;
261             }
262         case UCX_PERF_CMD_PUT:
263             if (TYPE == UCX_PERF_TEST_TYPE_PINGPONG) {
264                 /* Put the control word at the latest byte of the IOV message */
265                 set_sn(UCS_PTR_BYTE_OFFSET(buffer,
266                                            uct_perf_get_buffer_extent(&m_perf.params) - 1),
267                        m_perf.uct.send_mem.mem_type, &sn);
268             }
269             /* coverity[switch_selector_expr_is_constant] */
270             switch (DATA) {
271             case UCT_PERF_DATA_LAYOUT_SHORT:
272                 return uct_ep_put_short(ep, buffer, length, remote_addr, rkey);
273             case UCT_PERF_DATA_LAYOUT_BCOPY:
274                 packed_len = uct_ep_put_bcopy(ep, pack_cb, (void*)this, remote_addr, rkey);
275                 return (packed_len >= 0) ? UCS_OK : (ucs_status_t)packed_len;
276             case UCT_PERF_DATA_LAYOUT_ZCOPY:
277                 return uct_ep_put_zcopy(ep, m_perf.uct.iov, m_perf.params.msg_size_cnt,
278                                         remote_addr, rkey, comp);
279             default:
280                 return UCS_ERR_INVALID_PARAM;
281             }
282         case UCX_PERF_CMD_GET:
283             /* coverity[switch_selector_expr_is_constant] */
284             switch (DATA) {
285             case UCT_PERF_DATA_LAYOUT_BCOPY:
286                 return uct_ep_get_bcopy(ep, unpack_cb, (void*)this,
287                                         length, remote_addr, rkey, comp);
288             case UCT_PERF_DATA_LAYOUT_ZCOPY:
289                 return uct_ep_get_zcopy(ep, m_perf.uct.iov, m_perf.params.msg_size_cnt,
290                                         remote_addr, rkey, comp);
291             default:
292                 return UCS_ERR_INVALID_PARAM;
293             }
294         case UCX_PERF_CMD_ADD:
295             if (length == sizeof(uint32_t)) {
296                 return uct_ep_atomic32_post(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn, remote_addr, rkey);
297             } else if (length == sizeof(uint64_t)) {
298                 return uct_ep_atomic64_post(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn, remote_addr, rkey);
299             } else {
300                 return UCS_ERR_INVALID_PARAM;
301             }
302         case UCX_PERF_CMD_FADD:
303             if (length == sizeof(uint32_t)) {
304                 return uct_ep_atomic32_fetch(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn,
305                                              (uint32_t*)buffer, remote_addr, rkey, comp);
306             } else if (length == sizeof(uint64_t)) {
307                 return uct_ep_atomic64_fetch(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn,
308                                              (uint64_t*)buffer, remote_addr, rkey, comp);
309             } else {
310                 return UCS_ERR_INVALID_PARAM;
311             }
312         case UCX_PERF_CMD_SWAP:
313             if (length == sizeof(uint32_t)) {
314                 return uct_ep_atomic32_fetch(ep, UCT_ATOMIC_OP_SWAP, sn,
315                                              (uint32_t*)buffer, remote_addr, rkey, comp);
316             } else if (length == sizeof(uint64_t)) {
317                 return uct_ep_atomic64_fetch(ep, UCT_ATOMIC_OP_SWAP, sn,
318                                              (uint64_t*)buffer, remote_addr, rkey, comp);
319             } else {
320                 return UCS_ERR_INVALID_PARAM;
321             }
322         case UCX_PERF_CMD_CSWAP:
323             if (length == sizeof(uint32_t)) {
324                 return uct_ep_atomic_cswap32(ep, prev_sn, sn, remote_addr, rkey,
325                                              (uint32_t*)buffer, comp);
326             } else if (length == sizeof(uint64_t)) {
327                 return uct_ep_atomic_cswap64(ep, prev_sn, sn, remote_addr, rkey,
328                                              (uint64_t*)buffer, comp);
329             } else {
330                 return UCS_ERR_INVALID_PARAM;
331             }
332         default:
333             return UCS_ERR_INVALID_PARAM;
334         }
335     }
336 
337     void UCS_F_ALWAYS_INLINE
send_b(uct_ep_h ep,psn_t sn,psn_t prev_sn,void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)338     send_b(uct_ep_h ep, psn_t sn, psn_t prev_sn, void *buffer, unsigned length,
339            uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp)
340     {
341         ucs_status_t status;
342         for (;;) {
343             status = send(ep, sn, prev_sn, buffer, length, remote_addr, rkey, comp);
344             if (ucs_likely(status == UCS_OK)) {
345                 if ((m_send_b_count++ % N_SEND_B_PER_PROGRESS) == 0) {
346                     progress_requestor();
347                 }
348                 return;
349             } else if (status == UCS_INPROGRESS) {
350                 ++m_completion.count;
351                 progress_requestor();
352                 ucs_assert((comp == NULL) || (outstanding() <= m_max_outstanding));
353                 return;
354             } else if (status == UCS_ERR_NO_RESOURCE) {
355                 progress_requestor();
356                 continue;
357             } else {
358                 ucs_error("Failed to send: %s", ucs_status_string(status));
359                 return;
360             }
361         };
362     }
363 
flush(int peer_index)364     void flush(int peer_index)
365     {
366         if (m_perf.params.flags & UCX_PERF_TEST_FLAG_FLUSH_EP) {
367             uct_perf_ep_flush_b(&m_perf, peer_index);
368         } else {
369             uct_perf_iface_flush_b(&m_perf);
370         }
371     }
372 
run_pingpong()373     ucs_status_t run_pingpong()
374     {
375         psn_t send_sn, *recv_sn, sn;
376         unsigned my_index;
377         uct_ep_h ep;
378         uint64_t remote_addr;
379         uct_rkey_t rkey;
380         void *buffer;
381         size_t length;
382 
383         length = ucx_perf_get_message_size(&m_perf.params);
384         ucs_assert(length >= sizeof(psn_t));
385 
386         /* coverity[switch_selector_expr_is_constant] */
387         switch (CMD) {
388         case UCX_PERF_CMD_AM:
389             recv_sn = &m_last_recvd_sn;
390             break;
391         case UCX_PERF_CMD_ADD:
392             recv_sn = (psn_t*)m_perf.recv_buffer;
393             break;
394         case UCX_PERF_CMD_PUT:
395             /* since polling on data, must be end of the buffer */
396             recv_sn = (psn_t*)m_perf.recv_buffer + length - 1;
397             break;
398         default:
399             ucs_error("Cannot run this test in ping-pong mode");
400             return UCS_ERR_INVALID_PARAM;
401         }
402 
403         uct_perf_test_prepare_iov_buffer();
404 
405         sn = std::numeric_limits<uint8_t>::max();
406         set_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type, &sn);
407 
408         uct_perf_barrier(&m_perf);
409 
410         my_index = rte_call(&m_perf, group_index);
411 
412         ucx_perf_test_start_clock(&m_perf);
413 
414         buffer      = m_perf.send_buffer;
415         remote_addr = m_perf.uct.peers[1 - my_index].remote_addr;
416         rkey        = m_perf.uct.peers[1 - my_index].rkey.rkey;
417         ep          = m_perf.uct.peers[1 - my_index].ep;
418 
419         send_sn = 0;
420         if (my_index == 0) {
421             UCX_PERF_TEST_FOREACH(&m_perf) {
422                 send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
423                        rkey, NULL);
424                 ucx_perf_update(&m_perf, 1, length);
425 
426                 do {
427                     progress_responder();
428                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
429                 } while (sn != send_sn);
430 
431                 ++send_sn;
432             }
433         } else if (my_index == 1) {
434             UCX_PERF_TEST_FOREACH(&m_perf) {
435                 do {
436                     progress_responder();
437                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
438                 } while (sn != send_sn);
439 
440                 send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
441                        rkey, NULL);
442                 ucx_perf_update(&m_perf, 1, length);
443                 ++send_sn;
444             }
445         }
446 
447         flush(1 - my_index);
448         ucx_perf_get_time(&m_perf);
449         return UCS_OK;
450     }
451 
run_stream_req_uni(bool flow_control,bool send_window,bool direction_to_responder)452     ucs_status_t run_stream_req_uni(bool flow_control, bool send_window,
453                                     bool direction_to_responder)
454     {
455         unsigned long remote_addr;
456         volatile psn_t *recv_sn;
457         psn_t sn, send_sn;
458         uct_rkey_t rkey;
459         void *buffer;
460         unsigned fc_window;
461         unsigned my_index;
462         unsigned length;
463         uct_ep_h ep;
464 
465         length = ucx_perf_get_message_size(&m_perf.params);
466         ucs_assert(length >= sizeof(psn_t));
467         ucs_assert(m_perf.params.uct.fc_window <= ((psn_t)-1) / 2);
468 
469         m_perf.allocator->memset(m_perf.send_buffer, 0, length);
470         m_perf.allocator->memset(m_perf.recv_buffer, 0, length);
471 
472         uct_perf_test_prepare_iov_buffer();
473 
474         recv_sn  = (direction_to_responder ?
475                     ((CMD == UCX_PERF_CMD_AM) ?
476                      &m_last_recvd_sn :
477                      (psn_t*)m_perf.recv_buffer) :
478                     (psn_t*)m_perf.send_buffer);
479         my_index = rte_call(&m_perf, group_index);
480 
481         uct_perf_barrier(&m_perf);
482 
483         ucx_perf_test_start_clock(&m_perf);
484 
485         ep          = m_perf.uct.peers[1 - my_index].ep;
486         buffer      = m_perf.send_buffer;
487         remote_addr = m_perf.uct.peers[1 - my_index].remote_addr;
488         rkey        = m_perf.uct.peers[1 - my_index].rkey.rkey;
489         fc_window   = m_perf.params.uct.fc_window;
490 
491         if (my_index == 1) {
492             /* send_sn is the next SN to send */
493             if (flow_control) {
494                 send_sn     = 1;
495             } else{
496                 send_sn     = 0; /* Remote buffer will remain 0 throughout the test */
497             }
498 
499             set_sn(buffer, m_perf.uct.send_mem.mem_type, &send_sn);
500 
501             UCX_PERF_TEST_FOREACH(&m_perf) {
502                 if (flow_control) {
503                     /* Wait until getting ACK from responder */
504                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
505                     ucs_assertv(UCS_CIRCULAR_COMPARE8(send_sn - 1, >=, sn),
506                                 "recv_sn=%d iters=%ld", sn, m_perf.current.iters);
507 
508                     while (UCS_CIRCULAR_COMPARE8(send_sn, >, sn + fc_window)) {
509                         progress_responder();
510                         sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
511                     }
512                 }
513 
514                 /* Wait until we have enough sends completed, then take
515                  * the next completion handle in the window. */
516                 wait_for_window(send_window);
517 
518                 if (flow_control) {
519                     send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
520                            rkey, &m_completion);
521                     ++send_sn;
522                 } else {
523                     send_b(ep, send_sn, send_sn, buffer, length, remote_addr,
524                            rkey, &m_completion);
525                 }
526 
527                 ucx_perf_update(&m_perf, 1, length);
528             }
529 
530             if (!flow_control) {
531                 sn = 2;
532                 /* Send "sentinel" value */
533                 if (direction_to_responder) {
534                     wait_for_window(send_window);
535                     set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
536                     send_b(ep, 2, send_sn, buffer, length, remote_addr, rkey,
537                            &m_completion);
538                 } else {
539                     set_sn(m_perf.recv_buffer,
540                            m_perf.uct.recv_mem.mem_type,
541                            &sn);
542                 }
543             } else {
544                 /* Wait for last ACK, to make sure no more messages will arrive. */
545                 ucs_assert(direction_to_responder);
546 
547                 do {
548                     progress_responder();
549                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
550                 } while (UCS_CIRCULAR_COMPARE8((psn_t)(send_sn - 1), >, sn));
551             }
552         } else if (my_index == 0) {
553             if (flow_control) {
554                 /* Since we're doing flow control, we can count exactly how
555                  * many packets were received.
556                  */
557                 send_sn = (psn_t)-1; /* Last SN we have sent (as acknowledgment) */
558                 ucs_assert(direction_to_responder);
559                 UCX_PERF_TEST_FOREACH(&m_perf) {
560                     progress_responder();
561                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
562 
563                     if (UCS_CIRCULAR_COMPARE8(sn, >, (psn_t)(send_sn + (fc_window / 2)))) {
564                         /* Send ACK every half-window */
565                         wait_for_window(send_window);
566                         send_b(ep, sn, send_sn, buffer, length, remote_addr,
567                                rkey, &m_completion);
568                         send_sn = sn;
569                     }
570 
571                     /* Calculate number of iterations */
572                     m_perf.current.iters +=
573                                     (psn_t)(sn - (psn_t)m_perf.current.iters);
574                 }
575 
576                 /* Send ACK for last packet */
577                 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
578                 if (UCS_CIRCULAR_COMPARE8(sn, >, send_sn)) {
579                     wait_for_window(send_window);
580                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
581                     send_b(ep, sn, send_sn, buffer, length, remote_addr,
582                            rkey, &m_completion);
583                 }
584             } else {
585                 /* Wait for "sentinel" value */
586                 ucs_time_t poll_time = ucs_get_time();
587 
588                 do {
589                     progress_responder();
590                     sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
591                     if (!direction_to_responder) {
592                         if (ucs_get_time() > poll_time + ucs_time_from_msec(1.0)) {
593                             wait_for_window(send_window);
594                             send_b(ep, 0, 0, buffer, length, remote_addr, rkey,
595                                    &m_completion);
596                             poll_time = ucs_get_time();
597                         }
598                     }
599                 } while (sn != 2);
600             }
601         }
602 
603         flush(1 - my_index);
604         ucx_perf_get_time(&m_perf);
605         ucs_assert(outstanding() == 0);
606         if (my_index == 1) {
607             ucx_perf_update(&m_perf, 0, 0);
608         }
609 
610         return UCS_OK;
611     }
612 
run()613     ucs_status_t run()
614     {
615         bool zcopy = (DATA == UCT_PERF_DATA_LAYOUT_ZCOPY);
616 
617         /* coverity[switch_selector_expr_is_constant] */
618         switch (TYPE) {
619         case UCX_PERF_TEST_TYPE_PINGPONG:
620             return run_pingpong();
621         case UCX_PERF_TEST_TYPE_STREAM_UNI:
622             /* coverity[switch_selector_expr_is_constant] */
623             switch (CMD) {
624             case UCX_PERF_CMD_PUT:
625                 return run_stream_req_uni(false, /* No need for flow control for RMA */
626                                           zcopy, /* ZCOPY can return INPROGRESS */
627                                           true /* data goes to responder */);
628             case UCX_PERF_CMD_ADD:
629                 return run_stream_req_uni(false, /* No need for flow control for RMA */
630                                           false, /* This atomic does not wait for reply */
631                                           true /* Data goes to responder */);
632             case UCX_PERF_CMD_AM:
633                 return run_stream_req_uni(true, /* Need flow control for active messages,
634                                                    because they are handled in SW */
635                                           zcopy, /* ZCOPY can return INPROGRESS */
636                                           true /* data goes to responder */);
637             case UCX_PERF_CMD_GET:
638                 return run_stream_req_uni(false, /* No flow control for RMA/AMO */
639                                           true, /* Waiting for replies */
640                                           false /* For GET, data is delivered to requester */ );
641             case UCX_PERF_CMD_FADD:
642             case UCX_PERF_CMD_SWAP:
643             case UCX_PERF_CMD_CSWAP:
644                 return run_stream_req_uni(false, /* No flow control for RMA/AMO */
645                                           true, /* Waiting for replies */
646                                           true /* For atomics, data goes both ways, but
647                                                      the request is easier to predict */ );
648             default:
649                 return UCS_ERR_INVALID_PARAM;
650             }
651         case UCX_PERF_TEST_TYPE_STREAM_BI:
652         default:
653             return UCS_ERR_INVALID_PARAM;
654         }
655     }
656 
657 private:
outstanding()658     inline unsigned outstanding() {
659         return m_completion.count - 1;
660     }
661 
662     ucx_perf_context_t &m_perf;
663     const unsigned     m_max_outstanding;
664     uct_completion_t   m_completion;
665     int                m_send_b_count;
666     /* this is only valid for UCT AM tests */
667     psn_t              m_last_recvd_sn;
668     const static int   N_SEND_B_PER_PROGRESS = 16;
669 };
670 
671 
672 #define TEST_CASE(_perf, _cmd, _type, _data, _onesided) \
673     if (((_perf)->params.command == (_cmd)) && \
674         ((_perf)->params.test_type == (_type)) && \
675         ((_perf)->params.uct.data_layout == (_data)) && \
676         (!!((_perf)->params.flags & UCX_PERF_TEST_FLAG_ONE_SIDED) == !!(_onesided))) \
677     { \
678         uct_perf_test_runner<_cmd, _type, _data, _onesided> r(*_perf); \
679         return r.run(); \
680     }
681 #define TEST_CASE_ALL_OSD(_perf, _case, _data) \
682    TEST_CASE(_perf, UCS_PP_TUPLE_0 _case, UCS_PP_TUPLE_1 _case, _data, true) \
683    TEST_CASE(_perf, UCS_PP_TUPLE_0 _case, UCS_PP_TUPLE_1 _case, _data, false)
684 #define TEST_CASE_ALL_DATA(_perf, _case) \
685    TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_SHORT) \
686    TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_BCOPY) \
687    TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_ZCOPY)
688 
uct_perf_test_dispatch(ucx_perf_context_t * perf)689 ucs_status_t uct_perf_test_dispatch(ucx_perf_context_t *perf)
690 {
691     UCS_PP_FOREACH(TEST_CASE_ALL_DATA, perf,
692         (UCX_PERF_CMD_AM,  UCX_PERF_TEST_TYPE_PINGPONG),
693         (UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG),
694         (UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_PINGPONG),
695         (UCX_PERF_CMD_AM,  UCX_PERF_TEST_TYPE_STREAM_UNI),
696         (UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI),
697         (UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI),
698         (UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI),
699         (UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI),
700         (UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI),
701         (UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI)
702         );
703 
704     ucs_error("Invalid test case");
705     return UCS_ERR_INVALID_PARAM;
706 }
707