1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
3 * Copyright (C) The University of Tennessee and The University
4 * of Tennessee Research Foundation. 2016. ALL RIGHTS RESERVED.
5 *
6 * See file LICENSE for terms.
7 */
8
9 #ifdef HAVE_CONFIG_H
10 # include "config.h"
11 #endif
12
13 #include <tools/perf/lib/libperf_int.h>
14
15 extern "C" {
16 #include <ucs/debug/log.h>
17 #include <ucs/sys/preprocessor.h>
18 #include <ucs/sys/math.h>
19 #include <ucs/sys/sys.h>
20 }
21
22 #include <limits>
23
24 template <ucx_perf_cmd_t CMD, ucx_perf_test_type_t TYPE, uct_perf_data_layout_t DATA, bool ONESIDED>
25 class uct_perf_test_runner {
26 public:
27
28 typedef uint8_t psn_t;
29
uct_perf_test_runner(ucx_perf_context_t & perf)30 uct_perf_test_runner(ucx_perf_context_t &perf) :
31 m_perf(perf),
32 m_max_outstanding(m_perf.params.max_outstanding),
33 m_send_b_count(0)
34
35 {
36 ucs_assert_always(m_max_outstanding > 0);
37
38 m_completion.count = 1;
39 m_completion.func = NULL;
40 m_last_recvd_sn = 0;
41
42 ucs_status_t status;
43 uct_iface_attr_t attr;
44 status = uct_iface_query(m_perf.uct.iface, &attr);
45 ucs_assert_always(status == UCS_OK);
46 if (attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT |
47 UCT_IFACE_FLAG_AM_BCOPY |
48 UCT_IFACE_FLAG_AM_ZCOPY)) {
49 status = uct_iface_set_am_handler(m_perf.uct.iface,
50 UCT_PERF_TEST_AM_ID, am_hander,
51 (void*)&m_last_recvd_sn, 0);
52 ucs_assert_always(status == UCS_OK);
53 }
54 }
55
~uct_perf_test_runner()56 ~uct_perf_test_runner() {
57 uct_iface_set_am_handler(m_perf.uct.iface, UCT_PERF_TEST_AM_ID, NULL,
58 NULL, 0);
59 }
60
61 /**
62 * Make uct_iov_t iov[msg_size_cnt] array with pointer elements to
63 * original buffer
64 */
uct_perf_get_buffer_iov(uct_iov_t * iov,void * buffer,unsigned header_size,uct_mem_h memh,const ucx_perf_context_t * perf)65 static void uct_perf_get_buffer_iov(uct_iov_t *iov, void *buffer,
66 unsigned header_size, uct_mem_h memh,
67 const ucx_perf_context_t *perf)
68 {
69 const size_t iovcnt = perf->params.msg_size_cnt;
70 size_t iov_length_it, iov_it;
71
72 ucs_assert(UCT_PERF_DATA_LAYOUT_ZCOPY == DATA);
73 ucs_assert(NULL != perf->params.msg_size_list);
74 ucs_assert(iovcnt > 0);
75 ucs_assert(perf->params.msg_size_list[0] >= header_size);
76
77 iov_length_it = 0;
78 for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
79 iov[iov_it].buffer = (char *)buffer + iov_length_it + header_size;
80 iov[iov_it].length = perf->params.msg_size_list[iov_it] - header_size;
81 iov[iov_it].memh = memh;
82 iov[iov_it].stride = 0;
83 iov[iov_it].count = 1;
84
85 if (perf->params.iov_stride) {
86 iov_length_it += perf->params.iov_stride - header_size;
87 } else {
88 iov_length_it += iov[iov_it].length;
89 }
90
91 header_size = 0; /* should be zero for next iterations */
92 }
93
94 ucs_debug("IOV buffer filled by %lu slices with total length %lu",
95 iovcnt, iov_length_it);
96 }
97
uct_perf_test_prepare_iov_buffer()98 void uct_perf_test_prepare_iov_buffer() {
99 if (UCT_PERF_DATA_LAYOUT_ZCOPY == DATA) {
100 size_t start_iov_buffer_size = 0;
101 if (UCX_PERF_CMD_AM == CMD) {
102 start_iov_buffer_size = m_perf.params.am_hdr_size;
103 }
104 uct_perf_get_buffer_iov(m_perf.uct.iov, m_perf.send_buffer,
105 start_iov_buffer_size,
106 m_perf.uct.send_mem.memh,
107 &m_perf);
108 }
109 }
110
111 /**
112 * Get the length between beginning of the IOV first buffer and the latest byte
113 * in the latest IOV buffer.
114 */
uct_perf_get_buffer_extent(const ucx_perf_params_t * params)115 size_t uct_perf_get_buffer_extent(const ucx_perf_params_t *params)
116 {
117 size_t length;
118
119 if ((UCT_PERF_DATA_LAYOUT_ZCOPY == DATA) && params->iov_stride) {
120 length = ((params->msg_size_cnt - 1) * params->iov_stride) +
121 params->msg_size_list[params->msg_size_cnt - 1];
122 } else {
123 length = ucx_perf_get_message_size(params);
124 }
125
126 return length;
127 }
128
set_sn(void * dst_sn,ucs_memory_type_t dst_mem_type,const void * src_sn) const129 inline void set_sn(void *dst_sn,
130 ucs_memory_type_t dst_mem_type,
131 const void *src_sn) const {
132 if (ucs_likely(m_perf.allocator->mem_type == UCS_MEMORY_TYPE_HOST)) {
133 ucs_assert(dst_mem_type == UCS_MEMORY_TYPE_HOST);
134 *reinterpret_cast<psn_t*>(dst_sn) = *reinterpret_cast<const psn_t*>(src_sn);
135 }
136
137 m_perf.allocator->memcpy(dst_sn, dst_mem_type,
138 src_sn, UCS_MEMORY_TYPE_HOST,
139 sizeof(psn_t));
140 }
141
get_sn(const volatile void * sn,ucs_memory_type_t mem_type) const142 inline psn_t get_sn(const volatile void *sn,
143 ucs_memory_type_t mem_type) const {
144 if (ucs_likely(mem_type == UCS_MEMORY_TYPE_HOST)) {
145 return *reinterpret_cast<const volatile psn_t*>(sn);
146 }
147
148 psn_t host_sn;
149 m_perf.allocator->memcpy(&host_sn, UCS_MEMORY_TYPE_HOST,
150 const_cast<const void*>(sn),
151 mem_type, sizeof(psn_t));
152 return host_sn;
153 }
154
set_recv_sn(void * recv_sn,ucs_memory_type_t recv_mem_type,const void * src_sn) const155 inline void set_recv_sn(void *recv_sn,
156 ucs_memory_type_t recv_mem_type,
157 const void *src_sn) const {
158 if (CMD == UCX_PERF_CMD_AM) {
159 ucs_assert(&m_last_recvd_sn == recv_sn);
160 *(psn_t*)recv_sn = *(const psn_t*)src_sn;
161 } else {
162 set_sn(recv_sn, recv_mem_type, src_sn);
163 }
164 }
165
get_recv_sn(const volatile void * recv_sn,ucs_memory_type_t recv_mem_type) const166 inline psn_t get_recv_sn(const volatile void *recv_sn,
167 ucs_memory_type_t recv_mem_type) const {
168 if (CMD == UCX_PERF_CMD_AM) {
169 /* it has to be updated after AM completion */
170 ucs_assert(&m_last_recvd_sn == recv_sn);
171 return m_last_recvd_sn;
172 } else {
173 return get_sn(recv_sn, recv_mem_type);
174 }
175 }
176
progress_responder()177 void UCS_F_ALWAYS_INLINE progress_responder() {
178 if (!ONESIDED) {
179 uct_worker_progress(m_perf.uct.worker);
180 }
181 }
182
progress_requestor()183 void UCS_F_ALWAYS_INLINE progress_requestor() {
184 uct_worker_progress(m_perf.uct.worker);
185 }
186
wait_for_window(bool send_window)187 void UCS_F_ALWAYS_INLINE wait_for_window(bool send_window)
188 {
189 while (send_window && (outstanding() >= m_max_outstanding)) {
190 progress_requestor();
191 }
192 }
193
am_hander(void * arg,void * data,size_t length,unsigned flags)194 static ucs_status_t am_hander(void *arg, void *data, size_t length,
195 unsigned flags)
196 {
197 /* we always assume that buffers provided by TLs are host memory */
198 ucs_assert(UCS_CIRCULAR_COMPARE8(*(psn_t*)arg, <=, *(psn_t*)data));
199 *(psn_t*)arg = *(psn_t*)data;
200 return UCS_OK;
201 }
202
pack_cb(void * dest,void * arg)203 static size_t pack_cb(void *dest, void *arg)
204 {
205 uct_perf_test_runner *self = (uct_perf_test_runner *)arg;
206 size_t length = ucx_perf_get_message_size(&self->m_perf.params);
207
208 self->m_perf.allocator->memcpy(/* we always assume that buffers
209 * provided by TLs are host memory */
210 dest, UCS_MEMORY_TYPE_HOST,
211 self->m_perf.send_buffer,
212 self->m_perf.uct.send_mem.mem_type,
213 length);
214
215 return length;
216 }
217
unpack_cb(void * arg,const void * data,size_t length)218 static void unpack_cb(void *arg, const void *data, size_t length)
219 {
220 uct_perf_test_runner *self = (uct_perf_test_runner *)arg;
221
222 self->m_perf.allocator->memcpy(self->m_perf.send_buffer,
223 self->m_perf.uct.send_mem.mem_type,
224 /* we always assume that buffers
225 * provided by TLs are host memory */
226 data, UCS_MEMORY_TYPE_HOST,
227 length);
228 }
229
230 ucs_status_t UCS_F_ALWAYS_INLINE
send(uct_ep_h ep,psn_t sn,psn_t prev_sn,void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)231 send(uct_ep_h ep, psn_t sn, psn_t prev_sn, void *buffer, unsigned length,
232 uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp)
233 {
234 uint64_t am_short_hdr;
235 size_t header_size;
236 ssize_t packed_len;
237
238 /* coverity[switch_selector_expr_is_constant] */
239 switch (CMD) {
240 case UCX_PERF_CMD_AM:
241 /* coverity[switch_selector_expr_is_constant] */
242 switch (DATA) {
243 case UCT_PERF_DATA_LAYOUT_SHORT:
244 am_short_hdr = sn;
245 return uct_ep_am_short(ep, UCT_PERF_TEST_AM_ID, am_short_hdr,
246 (char*)buffer + sizeof(am_short_hdr),
247 length - sizeof(am_short_hdr));
248 case UCT_PERF_DATA_LAYOUT_BCOPY:
249 set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
250 packed_len = uct_ep_am_bcopy(ep, UCT_PERF_TEST_AM_ID, pack_cb,
251 (void*)this, 0);
252 return (packed_len >= 0) ? UCS_OK : (ucs_status_t)packed_len;
253 case UCT_PERF_DATA_LAYOUT_ZCOPY:
254 set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
255 header_size = m_perf.params.am_hdr_size;
256 return uct_ep_am_zcopy(ep, UCT_PERF_TEST_AM_ID, buffer, header_size,
257 m_perf.uct.iov, m_perf.params.msg_size_cnt,
258 0, comp);
259 default:
260 return UCS_ERR_INVALID_PARAM;
261 }
262 case UCX_PERF_CMD_PUT:
263 if (TYPE == UCX_PERF_TEST_TYPE_PINGPONG) {
264 /* Put the control word at the latest byte of the IOV message */
265 set_sn(UCS_PTR_BYTE_OFFSET(buffer,
266 uct_perf_get_buffer_extent(&m_perf.params) - 1),
267 m_perf.uct.send_mem.mem_type, &sn);
268 }
269 /* coverity[switch_selector_expr_is_constant] */
270 switch (DATA) {
271 case UCT_PERF_DATA_LAYOUT_SHORT:
272 return uct_ep_put_short(ep, buffer, length, remote_addr, rkey);
273 case UCT_PERF_DATA_LAYOUT_BCOPY:
274 packed_len = uct_ep_put_bcopy(ep, pack_cb, (void*)this, remote_addr, rkey);
275 return (packed_len >= 0) ? UCS_OK : (ucs_status_t)packed_len;
276 case UCT_PERF_DATA_LAYOUT_ZCOPY:
277 return uct_ep_put_zcopy(ep, m_perf.uct.iov, m_perf.params.msg_size_cnt,
278 remote_addr, rkey, comp);
279 default:
280 return UCS_ERR_INVALID_PARAM;
281 }
282 case UCX_PERF_CMD_GET:
283 /* coverity[switch_selector_expr_is_constant] */
284 switch (DATA) {
285 case UCT_PERF_DATA_LAYOUT_BCOPY:
286 return uct_ep_get_bcopy(ep, unpack_cb, (void*)this,
287 length, remote_addr, rkey, comp);
288 case UCT_PERF_DATA_LAYOUT_ZCOPY:
289 return uct_ep_get_zcopy(ep, m_perf.uct.iov, m_perf.params.msg_size_cnt,
290 remote_addr, rkey, comp);
291 default:
292 return UCS_ERR_INVALID_PARAM;
293 }
294 case UCX_PERF_CMD_ADD:
295 if (length == sizeof(uint32_t)) {
296 return uct_ep_atomic32_post(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn, remote_addr, rkey);
297 } else if (length == sizeof(uint64_t)) {
298 return uct_ep_atomic64_post(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn, remote_addr, rkey);
299 } else {
300 return UCS_ERR_INVALID_PARAM;
301 }
302 case UCX_PERF_CMD_FADD:
303 if (length == sizeof(uint32_t)) {
304 return uct_ep_atomic32_fetch(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn,
305 (uint32_t*)buffer, remote_addr, rkey, comp);
306 } else if (length == sizeof(uint64_t)) {
307 return uct_ep_atomic64_fetch(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn,
308 (uint64_t*)buffer, remote_addr, rkey, comp);
309 } else {
310 return UCS_ERR_INVALID_PARAM;
311 }
312 case UCX_PERF_CMD_SWAP:
313 if (length == sizeof(uint32_t)) {
314 return uct_ep_atomic32_fetch(ep, UCT_ATOMIC_OP_SWAP, sn,
315 (uint32_t*)buffer, remote_addr, rkey, comp);
316 } else if (length == sizeof(uint64_t)) {
317 return uct_ep_atomic64_fetch(ep, UCT_ATOMIC_OP_SWAP, sn,
318 (uint64_t*)buffer, remote_addr, rkey, comp);
319 } else {
320 return UCS_ERR_INVALID_PARAM;
321 }
322 case UCX_PERF_CMD_CSWAP:
323 if (length == sizeof(uint32_t)) {
324 return uct_ep_atomic_cswap32(ep, prev_sn, sn, remote_addr, rkey,
325 (uint32_t*)buffer, comp);
326 } else if (length == sizeof(uint64_t)) {
327 return uct_ep_atomic_cswap64(ep, prev_sn, sn, remote_addr, rkey,
328 (uint64_t*)buffer, comp);
329 } else {
330 return UCS_ERR_INVALID_PARAM;
331 }
332 default:
333 return UCS_ERR_INVALID_PARAM;
334 }
335 }
336
337 void UCS_F_ALWAYS_INLINE
send_b(uct_ep_h ep,psn_t sn,psn_t prev_sn,void * buffer,unsigned length,uint64_t remote_addr,uct_rkey_t rkey,uct_completion_t * comp)338 send_b(uct_ep_h ep, psn_t sn, psn_t prev_sn, void *buffer, unsigned length,
339 uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp)
340 {
341 ucs_status_t status;
342 for (;;) {
343 status = send(ep, sn, prev_sn, buffer, length, remote_addr, rkey, comp);
344 if (ucs_likely(status == UCS_OK)) {
345 if ((m_send_b_count++ % N_SEND_B_PER_PROGRESS) == 0) {
346 progress_requestor();
347 }
348 return;
349 } else if (status == UCS_INPROGRESS) {
350 ++m_completion.count;
351 progress_requestor();
352 ucs_assert((comp == NULL) || (outstanding() <= m_max_outstanding));
353 return;
354 } else if (status == UCS_ERR_NO_RESOURCE) {
355 progress_requestor();
356 continue;
357 } else {
358 ucs_error("Failed to send: %s", ucs_status_string(status));
359 return;
360 }
361 };
362 }
363
flush(int peer_index)364 void flush(int peer_index)
365 {
366 if (m_perf.params.flags & UCX_PERF_TEST_FLAG_FLUSH_EP) {
367 uct_perf_ep_flush_b(&m_perf, peer_index);
368 } else {
369 uct_perf_iface_flush_b(&m_perf);
370 }
371 }
372
run_pingpong()373 ucs_status_t run_pingpong()
374 {
375 psn_t send_sn, *recv_sn, sn;
376 unsigned my_index;
377 uct_ep_h ep;
378 uint64_t remote_addr;
379 uct_rkey_t rkey;
380 void *buffer;
381 size_t length;
382
383 length = ucx_perf_get_message_size(&m_perf.params);
384 ucs_assert(length >= sizeof(psn_t));
385
386 /* coverity[switch_selector_expr_is_constant] */
387 switch (CMD) {
388 case UCX_PERF_CMD_AM:
389 recv_sn = &m_last_recvd_sn;
390 break;
391 case UCX_PERF_CMD_ADD:
392 recv_sn = (psn_t*)m_perf.recv_buffer;
393 break;
394 case UCX_PERF_CMD_PUT:
395 /* since polling on data, must be end of the buffer */
396 recv_sn = (psn_t*)m_perf.recv_buffer + length - 1;
397 break;
398 default:
399 ucs_error("Cannot run this test in ping-pong mode");
400 return UCS_ERR_INVALID_PARAM;
401 }
402
403 uct_perf_test_prepare_iov_buffer();
404
405 sn = std::numeric_limits<uint8_t>::max();
406 set_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type, &sn);
407
408 uct_perf_barrier(&m_perf);
409
410 my_index = rte_call(&m_perf, group_index);
411
412 ucx_perf_test_start_clock(&m_perf);
413
414 buffer = m_perf.send_buffer;
415 remote_addr = m_perf.uct.peers[1 - my_index].remote_addr;
416 rkey = m_perf.uct.peers[1 - my_index].rkey.rkey;
417 ep = m_perf.uct.peers[1 - my_index].ep;
418
419 send_sn = 0;
420 if (my_index == 0) {
421 UCX_PERF_TEST_FOREACH(&m_perf) {
422 send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
423 rkey, NULL);
424 ucx_perf_update(&m_perf, 1, length);
425
426 do {
427 progress_responder();
428 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
429 } while (sn != send_sn);
430
431 ++send_sn;
432 }
433 } else if (my_index == 1) {
434 UCX_PERF_TEST_FOREACH(&m_perf) {
435 do {
436 progress_responder();
437 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
438 } while (sn != send_sn);
439
440 send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
441 rkey, NULL);
442 ucx_perf_update(&m_perf, 1, length);
443 ++send_sn;
444 }
445 }
446
447 flush(1 - my_index);
448 ucx_perf_get_time(&m_perf);
449 return UCS_OK;
450 }
451
run_stream_req_uni(bool flow_control,bool send_window,bool direction_to_responder)452 ucs_status_t run_stream_req_uni(bool flow_control, bool send_window,
453 bool direction_to_responder)
454 {
455 unsigned long remote_addr;
456 volatile psn_t *recv_sn;
457 psn_t sn, send_sn;
458 uct_rkey_t rkey;
459 void *buffer;
460 unsigned fc_window;
461 unsigned my_index;
462 unsigned length;
463 uct_ep_h ep;
464
465 length = ucx_perf_get_message_size(&m_perf.params);
466 ucs_assert(length >= sizeof(psn_t));
467 ucs_assert(m_perf.params.uct.fc_window <= ((psn_t)-1) / 2);
468
469 m_perf.allocator->memset(m_perf.send_buffer, 0, length);
470 m_perf.allocator->memset(m_perf.recv_buffer, 0, length);
471
472 uct_perf_test_prepare_iov_buffer();
473
474 recv_sn = (direction_to_responder ?
475 ((CMD == UCX_PERF_CMD_AM) ?
476 &m_last_recvd_sn :
477 (psn_t*)m_perf.recv_buffer) :
478 (psn_t*)m_perf.send_buffer);
479 my_index = rte_call(&m_perf, group_index);
480
481 uct_perf_barrier(&m_perf);
482
483 ucx_perf_test_start_clock(&m_perf);
484
485 ep = m_perf.uct.peers[1 - my_index].ep;
486 buffer = m_perf.send_buffer;
487 remote_addr = m_perf.uct.peers[1 - my_index].remote_addr;
488 rkey = m_perf.uct.peers[1 - my_index].rkey.rkey;
489 fc_window = m_perf.params.uct.fc_window;
490
491 if (my_index == 1) {
492 /* send_sn is the next SN to send */
493 if (flow_control) {
494 send_sn = 1;
495 } else{
496 send_sn = 0; /* Remote buffer will remain 0 throughout the test */
497 }
498
499 set_sn(buffer, m_perf.uct.send_mem.mem_type, &send_sn);
500
501 UCX_PERF_TEST_FOREACH(&m_perf) {
502 if (flow_control) {
503 /* Wait until getting ACK from responder */
504 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
505 ucs_assertv(UCS_CIRCULAR_COMPARE8(send_sn - 1, >=, sn),
506 "recv_sn=%d iters=%ld", sn, m_perf.current.iters);
507
508 while (UCS_CIRCULAR_COMPARE8(send_sn, >, sn + fc_window)) {
509 progress_responder();
510 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
511 }
512 }
513
514 /* Wait until we have enough sends completed, then take
515 * the next completion handle in the window. */
516 wait_for_window(send_window);
517
518 if (flow_control) {
519 send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
520 rkey, &m_completion);
521 ++send_sn;
522 } else {
523 send_b(ep, send_sn, send_sn, buffer, length, remote_addr,
524 rkey, &m_completion);
525 }
526
527 ucx_perf_update(&m_perf, 1, length);
528 }
529
530 if (!flow_control) {
531 sn = 2;
532 /* Send "sentinel" value */
533 if (direction_to_responder) {
534 wait_for_window(send_window);
535 set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
536 send_b(ep, 2, send_sn, buffer, length, remote_addr, rkey,
537 &m_completion);
538 } else {
539 set_sn(m_perf.recv_buffer,
540 m_perf.uct.recv_mem.mem_type,
541 &sn);
542 }
543 } else {
544 /* Wait for last ACK, to make sure no more messages will arrive. */
545 ucs_assert(direction_to_responder);
546
547 do {
548 progress_responder();
549 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
550 } while (UCS_CIRCULAR_COMPARE8((psn_t)(send_sn - 1), >, sn));
551 }
552 } else if (my_index == 0) {
553 if (flow_control) {
554 /* Since we're doing flow control, we can count exactly how
555 * many packets were received.
556 */
557 send_sn = (psn_t)-1; /* Last SN we have sent (as acknowledgment) */
558 ucs_assert(direction_to_responder);
559 UCX_PERF_TEST_FOREACH(&m_perf) {
560 progress_responder();
561 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
562
563 if (UCS_CIRCULAR_COMPARE8(sn, >, (psn_t)(send_sn + (fc_window / 2)))) {
564 /* Send ACK every half-window */
565 wait_for_window(send_window);
566 send_b(ep, sn, send_sn, buffer, length, remote_addr,
567 rkey, &m_completion);
568 send_sn = sn;
569 }
570
571 /* Calculate number of iterations */
572 m_perf.current.iters +=
573 (psn_t)(sn - (psn_t)m_perf.current.iters);
574 }
575
576 /* Send ACK for last packet */
577 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
578 if (UCS_CIRCULAR_COMPARE8(sn, >, send_sn)) {
579 wait_for_window(send_window);
580 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
581 send_b(ep, sn, send_sn, buffer, length, remote_addr,
582 rkey, &m_completion);
583 }
584 } else {
585 /* Wait for "sentinel" value */
586 ucs_time_t poll_time = ucs_get_time();
587
588 do {
589 progress_responder();
590 sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
591 if (!direction_to_responder) {
592 if (ucs_get_time() > poll_time + ucs_time_from_msec(1.0)) {
593 wait_for_window(send_window);
594 send_b(ep, 0, 0, buffer, length, remote_addr, rkey,
595 &m_completion);
596 poll_time = ucs_get_time();
597 }
598 }
599 } while (sn != 2);
600 }
601 }
602
603 flush(1 - my_index);
604 ucx_perf_get_time(&m_perf);
605 ucs_assert(outstanding() == 0);
606 if (my_index == 1) {
607 ucx_perf_update(&m_perf, 0, 0);
608 }
609
610 return UCS_OK;
611 }
612
run()613 ucs_status_t run()
614 {
615 bool zcopy = (DATA == UCT_PERF_DATA_LAYOUT_ZCOPY);
616
617 /* coverity[switch_selector_expr_is_constant] */
618 switch (TYPE) {
619 case UCX_PERF_TEST_TYPE_PINGPONG:
620 return run_pingpong();
621 case UCX_PERF_TEST_TYPE_STREAM_UNI:
622 /* coverity[switch_selector_expr_is_constant] */
623 switch (CMD) {
624 case UCX_PERF_CMD_PUT:
625 return run_stream_req_uni(false, /* No need for flow control for RMA */
626 zcopy, /* ZCOPY can return INPROGRESS */
627 true /* data goes to responder */);
628 case UCX_PERF_CMD_ADD:
629 return run_stream_req_uni(false, /* No need for flow control for RMA */
630 false, /* This atomic does not wait for reply */
631 true /* Data goes to responder */);
632 case UCX_PERF_CMD_AM:
633 return run_stream_req_uni(true, /* Need flow control for active messages,
634 because they are handled in SW */
635 zcopy, /* ZCOPY can return INPROGRESS */
636 true /* data goes to responder */);
637 case UCX_PERF_CMD_GET:
638 return run_stream_req_uni(false, /* No flow control for RMA/AMO */
639 true, /* Waiting for replies */
640 false /* For GET, data is delivered to requester */ );
641 case UCX_PERF_CMD_FADD:
642 case UCX_PERF_CMD_SWAP:
643 case UCX_PERF_CMD_CSWAP:
644 return run_stream_req_uni(false, /* No flow control for RMA/AMO */
645 true, /* Waiting for replies */
646 true /* For atomics, data goes both ways, but
647 the request is easier to predict */ );
648 default:
649 return UCS_ERR_INVALID_PARAM;
650 }
651 case UCX_PERF_TEST_TYPE_STREAM_BI:
652 default:
653 return UCS_ERR_INVALID_PARAM;
654 }
655 }
656
657 private:
outstanding()658 inline unsigned outstanding() {
659 return m_completion.count - 1;
660 }
661
662 ucx_perf_context_t &m_perf;
663 const unsigned m_max_outstanding;
664 uct_completion_t m_completion;
665 int m_send_b_count;
666 /* this is only valid for UCT AM tests */
667 psn_t m_last_recvd_sn;
668 const static int N_SEND_B_PER_PROGRESS = 16;
669 };
670
671
672 #define TEST_CASE(_perf, _cmd, _type, _data, _onesided) \
673 if (((_perf)->params.command == (_cmd)) && \
674 ((_perf)->params.test_type == (_type)) && \
675 ((_perf)->params.uct.data_layout == (_data)) && \
676 (!!((_perf)->params.flags & UCX_PERF_TEST_FLAG_ONE_SIDED) == !!(_onesided))) \
677 { \
678 uct_perf_test_runner<_cmd, _type, _data, _onesided> r(*_perf); \
679 return r.run(); \
680 }
681 #define TEST_CASE_ALL_OSD(_perf, _case, _data) \
682 TEST_CASE(_perf, UCS_PP_TUPLE_0 _case, UCS_PP_TUPLE_1 _case, _data, true) \
683 TEST_CASE(_perf, UCS_PP_TUPLE_0 _case, UCS_PP_TUPLE_1 _case, _data, false)
684 #define TEST_CASE_ALL_DATA(_perf, _case) \
685 TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_SHORT) \
686 TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_BCOPY) \
687 TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_ZCOPY)
688
uct_perf_test_dispatch(ucx_perf_context_t * perf)689 ucs_status_t uct_perf_test_dispatch(ucx_perf_context_t *perf)
690 {
691 UCS_PP_FOREACH(TEST_CASE_ALL_DATA, perf,
692 (UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_PINGPONG),
693 (UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG),
694 (UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_PINGPONG),
695 (UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_STREAM_UNI),
696 (UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI),
697 (UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI),
698 (UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI),
699 (UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI),
700 (UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI),
701 (UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI)
702 );
703
704 ucs_error("Invalid test case");
705 return UCS_ERR_INVALID_PARAM;
706 }
707