1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 * Copyright (C) The University of Tennessee and The University
4 *               of Tennessee Research Foundation. 2015. ALL RIGHTS RESERVED.
5 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
6 *
7 * See file LICENSE for terms.
8 */
9 
10 
11 #ifdef HAVE_CONFIG_H
12 #  include "config.h"
13 #endif
14 
15 #include "api/libperf.h"
16 #include "lib/libperf_int.h"
17 
18 #include <ucs/sys/string.h>
19 #include <ucs/sys/sys.h>
20 #include <ucs/sys/sock.h>
21 #include <ucs/debug/log.h>
22 
23 #include <sys/socket.h>
24 #include <arpa/inet.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <netdb.h>
29 #include <getopt.h>
30 #include <string.h>
31 #include <sys/types.h>
32 #include <sys/poll.h>
33 #include <locale.h>
34 #if defined (HAVE_MPI)
35 #  include <mpi.h>
36 #elif defined (HAVE_RTE)
37 #   include<rte.h>
38 #endif
39 
40 #define MAX_BATCH_FILES         32
41 #define MAX_CPUS                1024
42 #define TL_RESOURCE_NAME_NONE   "<none>"
43 #define TEST_PARAMS_ARGS        "t:n:s:W:O:w:D:i:H:oSCqM:r:T:d:x:A:BUm:"
44 #define TEST_ID_UNDEFINED       -1
45 
46 enum {
47     TEST_FLAG_PRINT_RESULTS = UCS_BIT(0),
48     TEST_FLAG_PRINT_TEST    = UCS_BIT(1),
49     TEST_FLAG_SET_AFFINITY  = UCS_BIT(8),
50     TEST_FLAG_NUMERIC_FMT   = UCS_BIT(9),
51     TEST_FLAG_PRINT_FINAL   = UCS_BIT(10),
52     TEST_FLAG_PRINT_CSV     = UCS_BIT(11)
53 };
54 
55 typedef struct sock_rte_group {
56     int                          is_server;
57     int                          connfd;
58 } sock_rte_group_t;
59 
60 
61 typedef struct test_type {
62     const char                   *name;
63     ucx_perf_api_t               api;
64     ucx_perf_cmd_t               command;
65     ucx_perf_test_type_t         test_type;
66     const char                   *desc;
67     const char                   *overhead_lat;
68     unsigned                     window_size;
69 } test_type_t;
70 
71 
72 typedef struct perftest_params {
73     ucx_perf_params_t            super;
74     int                          test_id;
75 } perftest_params_t;
76 
77 
78 struct perftest_context {
79     perftest_params_t            params;
80     const char                   *server_addr;
81     int                          port;
82     int                          mpi;
83     unsigned                     num_cpus;
84     unsigned                     cpus[MAX_CPUS];
85     unsigned                     flags;
86 
87     unsigned                     num_batch_files;
88     char                         *batch_files[MAX_BATCH_FILES];
89     char                         *test_names[MAX_BATCH_FILES];
90 
91     sock_rte_group_t             sock_rte_group;
92 };
93 
94 
95 test_type_t tests[] = {
96     {"am_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_PINGPONG,
97      "active message latency", "latency", 1},
98 
99     {"put_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG,
100      "put latency", "latency", 1},
101 
102     {"add_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_PINGPONG,
103      "atomic add latency", "latency", 1},
104 
105     {"get", UCX_PERF_API_UCT, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI,
106      "get latency / bandwidth / message rate", "latency", 1},
107 
108     {"fadd", UCX_PERF_API_UCT, UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
109      "atomic fetch-and-add latency / rate", "latency", 1},
110 
111     {"swap", UCX_PERF_API_UCT, UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
112      "atomic swap latency / rate", "latency", 1},
113 
114     {"cswap", UCX_PERF_API_UCT, UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
115      "atomic compare-and-swap latency / rate", "latency", 1},
116 
117     {"am_bw", UCX_PERF_API_UCT, UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_STREAM_UNI,
118      "active message bandwidth / message rate", "overhead", 1},
119 
120     {"put_bw", UCX_PERF_API_UCT, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI,
121      "put bandwidth / message rate", "overhead", 1},
122 
123     {"add_mr", UCX_PERF_API_UCT, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
124      "atomic add message rate", "overhead", 1},
125 
126     {"tag_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG, UCX_PERF_TEST_TYPE_PINGPONG,
127      "tag match latency", "latency", 1},
128 
129     {"tag_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG, UCX_PERF_TEST_TYPE_STREAM_UNI,
130      "tag match bandwidth", "overhead", 32},
131 
132     {"tag_sync_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG_SYNC, UCX_PERF_TEST_TYPE_PINGPONG,
133      "tag sync match latency", "latency", 1},
134 
135     {"tag_sync_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG_SYNC, UCX_PERF_TEST_TYPE_STREAM_UNI,
136      "tag sync match bandwidth", "overhead", 32},
137 
138     {"ucp_put_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG,
139      "put latency", "latency", 1},
140 
141     {"ucp_put_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI,
142      "put bandwidth", "overhead", 32},
143 
144     {"ucp_get", UCX_PERF_API_UCP, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI,
145      "get latency / bandwidth / message rate", "latency", 1},
146 
147     {"ucp_add", UCX_PERF_API_UCP, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
148      "atomic add bandwidth / message rate", "overhead", 1},
149 
150     {"ucp_fadd", UCX_PERF_API_UCP, UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
151      "atomic fetch-and-add latency / bandwidth / rate", "latency", 1},
152 
153     {"ucp_swap", UCX_PERF_API_UCP, UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
154      "atomic swap latency / bandwidth / rate", "latency", 1},
155 
156     {"ucp_cswap", UCX_PERF_API_UCP, UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
157      "atomic compare-and-swap latency / bandwidth / rate", "latency", 1},
158 
159     {"stream_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_STREAM, UCX_PERF_TEST_TYPE_STREAM_UNI,
160      "stream bandwidth", "overhead", 1},
161 
162     {"stream_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_STREAM, UCX_PERF_TEST_TYPE_PINGPONG,
163      "stream latency", "latency", 1},
164 
165      {NULL}
166 };
167 
sock_io(int sock,ssize_t (* sock_call)(int,void *,size_t,int),int poll_events,void * data,size_t size,void (* progress)(void * arg),void * arg,const char * name)168 static int sock_io(int sock, ssize_t (*sock_call)(int, void *, size_t, int),
169                    int poll_events, void *data, size_t size,
170                    void (*progress)(void *arg), void *arg, const char *name)
171 {
172     size_t total = 0;
173     struct pollfd pfd;
174     int ret;
175 
176     while (total < size) {
177         pfd.fd      = sock;
178         pfd.events  = poll_events;
179         pfd.revents = 0;
180 
181         ret = poll(&pfd, 1, 1); /* poll for 1ms */
182         if (ret > 0) {
183             ucs_assert(ret == 1);
184             ucs_assert(pfd.revents & poll_events);
185 
186             ret = sock_call(sock, (char*)data + total, size - total, 0);
187             if (ret < 0) {
188                 ucs_error("%s() failed: %m", name);
189                 return -1;
190             }
191             total += ret;
192         } else if ((ret < 0) && (errno != EINTR)) {
193             ucs_error("poll(fd=%d) failed: %m", sock);
194             return -1;
195         }
196 
197         /* progress user context */
198         if (progress != NULL) {
199             progress(arg);
200         }
201     }
202     return 0;
203 }
204 
safe_send(int sock,void * data,size_t size,void (* progress)(void * arg),void * arg)205 static int safe_send(int sock, void *data, size_t size,
206                      void (*progress)(void *arg), void *arg)
207 {
208     typedef ssize_t (*sock_call)(int, void *, size_t, int);
209 
210     return sock_io(sock, (sock_call)send, POLLOUT, data, size, progress, arg, "send");
211 }
212 
safe_recv(int sock,void * data,size_t size,void (* progress)(void * arg),void * arg)213 static int safe_recv(int sock, void *data, size_t size,
214                      void (*progress)(void *arg), void *arg)
215 {
216     return sock_io(sock, recv, POLLIN, data, size, progress, arg, "recv");
217 }
218 
print_progress(char ** test_names,unsigned num_names,const ucx_perf_result_t * result,unsigned flags,int final,int is_server,int is_multi_thread)219 static void print_progress(char **test_names, unsigned num_names,
220                            const ucx_perf_result_t *result, unsigned flags,
221                            int final, int is_server, int is_multi_thread)
222 {
223     static const char *fmt_csv;
224     static const char *fmt_numeric;
225     static const char *fmt_plain;
226     unsigned i;
227 
228     if (!(flags & TEST_FLAG_PRINT_RESULTS) ||
229         (!final && (flags & TEST_FLAG_PRINT_FINAL)))
230     {
231         return;
232     }
233 
234     if (flags & TEST_FLAG_PRINT_CSV) {
235         for (i = 0; i < num_names; ++i) {
236             printf("%s,", test_names[i]);
237         }
238     }
239 
240 #if _OPENMP
241     if (!final) {
242         printf("[thread %d]", omp_get_thread_num());
243     } else if (flags & TEST_FLAG_PRINT_RESULTS) {
244         printf("Final:    ");
245     }
246 #endif
247 
248     if (is_multi_thread && final) {
249         fmt_csv     = "%4.0f,%.3f,%.2f,%.0f\n";
250         fmt_numeric = "%'18.0f %29.3f %22.2f %'24.0f\n";
251         fmt_plain   = "%18.0f %29.3f %22.2f %23.0f\n";
252 
253         printf((flags & TEST_FLAG_PRINT_CSV)   ? fmt_csv :
254                (flags & TEST_FLAG_NUMERIC_FMT) ? fmt_numeric :
255                                                  fmt_plain,
256                (double)result->iters,
257                result->latency.total_average * 1000000.0,
258                result->bandwidth.total_average / (1024.0 * 1024.0),
259                result->msgrate.total_average);
260     } else {
261         fmt_csv     = "%4.0f,%.3f,%.3f,%.3f,%.2f,%.2f,%.0f,%.0f\n";
262         fmt_numeric = "%'18.0f %9.3f %9.3f %9.3f %11.2f %10.2f %'11.0f %'11.0f\n";
263         fmt_plain   = "%18.0f %9.3f %9.3f %9.3f %11.2f %10.2f %11.0f %11.0f\n";
264 
265         printf((flags & TEST_FLAG_PRINT_CSV)   ? fmt_csv :
266                (flags & TEST_FLAG_NUMERIC_FMT) ? fmt_numeric :
267                                                  fmt_plain,
268                (double)result->iters,
269                result->latency.typical * 1000000.0,
270                result->latency.moment_average * 1000000.0,
271                result->latency.total_average * 1000000.0,
272                result->bandwidth.moment_average / (1024.0 * 1024.0),
273                result->bandwidth.total_average / (1024.0 * 1024.0),
274                result->msgrate.moment_average,
275                result->msgrate.total_average);
276     }
277 
278     fflush(stdout);
279 }
280 
print_header(struct perftest_context * ctx)281 static void print_header(struct perftest_context *ctx)
282 {
283     const char *overhead_lat_str;
284     const char *test_data_str;
285     const char *test_api_str;
286     test_type_t *test;
287     unsigned i;
288 
289     test = (ctx->params.test_id == TEST_ID_UNDEFINED) ? NULL :
290            &tests[ctx->params.test_id];
291 
292     if ((ctx->flags & TEST_FLAG_PRINT_TEST) && (test != NULL)) {
293         if (test->api == UCX_PERF_API_UCT) {
294             test_api_str = "transport layer";
295             switch (ctx->params.super.uct.data_layout) {
296             case UCT_PERF_DATA_LAYOUT_SHORT:
297                 test_data_str = "short";
298                 break;
299             case UCT_PERF_DATA_LAYOUT_BCOPY:
300                 test_data_str = "bcopy";
301                 break;
302             case UCT_PERF_DATA_LAYOUT_ZCOPY:
303                 test_data_str = "zcopy";
304                 break;
305             default:
306                 test_data_str = "(undefined)";
307                 break;
308             }
309         } else if (test->api == UCX_PERF_API_UCP) {
310             test_api_str  = "protocol layer";
311             test_data_str = "(automatic)"; /* TODO contig/stride/stream */
312         } else {
313             return;
314         }
315 
316         printf("+------------------------------------------------------------------------------------------+\n");
317         printf("| API:          %-60s               |\n", test_api_str);
318         printf("| Test:         %-60s               |\n", test->desc);
319         printf("| Data layout:  %-60s               |\n", test_data_str);
320         printf("| Send memory:  %-60s               |\n", ucs_memory_type_names[ctx->params.super.send_mem_type]);
321         printf("| Recv memory:  %-60s               |\n", ucs_memory_type_names[ctx->params.super.recv_mem_type]);
322         printf("| Message size: %-60zu               |\n", ucx_perf_get_message_size(&ctx->params.super));
323     }
324 
325     if (ctx->flags & TEST_FLAG_PRINT_CSV) {
326         if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
327             for (i = 0; i < ctx->num_batch_files; ++i) {
328                 printf("%s,", basename(ctx->batch_files[i]));
329             }
330             printf("iterations,typical_lat,avg_lat,overall_lat,avg_bw,overall_bw,avg_mr,overall_mr\n");
331         }
332     } else {
333         if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
334             overhead_lat_str = (test == NULL) ? "overhead" : test->overhead_lat;
335 
336             printf("+--------------+--------------+-----------------------------+---------------------+-----------------------+\n");
337             printf("|              |              |      %8s (usec)        |   bandwidth (MB/s)  |  message rate (msg/s) |\n", overhead_lat_str);
338             printf("+--------------+--------------+---------+---------+---------+----------+----------+-----------+-----------+\n");
339             printf("|    Stage     | # iterations | typical | average | overall |  average |  overall |  average  |  overall  |\n");
340             printf("+--------------+--------------+---------+---------+---------+----------+----------+-----------+-----------+\n");
341         } else if (ctx->flags & TEST_FLAG_PRINT_TEST) {
342             printf("+------------------------------------------------------------------------------------------+\n");
343         }
344     }
345 }
346 
print_test_name(struct perftest_context * ctx)347 static void print_test_name(struct perftest_context *ctx)
348 {
349     char buf[200];
350     unsigned i, pos;
351 
352     if (!(ctx->flags & TEST_FLAG_PRINT_CSV) && (ctx->num_batch_files > 0)) {
353         strcpy(buf, "+--------------+---------+---------+---------+----------+----------+-----------+-----------+");
354 
355         pos = 1;
356         for (i = 0; i < ctx->num_batch_files; ++i) {
357            if (i != 0) {
358                buf[pos++] = '/';
359            }
360            memcpy(&buf[pos], ctx->test_names[i],
361                   ucs_min(strlen(ctx->test_names[i]), sizeof(buf) - pos - 1));
362            pos += strlen(ctx->test_names[i]);
363         }
364 
365         if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
366             printf("%s\n", buf);
367         }
368     }
369 }
370 
print_memory_type_usage(void)371 static void print_memory_type_usage(void)
372 {
373     ucs_memory_type_t it;
374     for (it = UCS_MEMORY_TYPE_HOST; it < UCS_MEMORY_TYPE_LAST; it++) {
375         if (ucx_perf_mem_type_allocators[it] != NULL) {
376             printf("                        %s - %s\n",
377                    ucs_memory_type_names[it],
378                    ucs_memory_type_descs[it]);
379         }
380     }
381 }
382 
usage(const struct perftest_context * ctx,const char * program)383 static void usage(const struct perftest_context *ctx, const char *program)
384 {
385     static const char* api_names[] = {
386         [UCX_PERF_API_UCT] = "UCT",
387         [UCX_PERF_API_UCP] = "UCP"
388     };
389     test_type_t *test;
390     int UCS_V_UNUSED rank;
391 
392 #ifdef HAVE_MPI
393     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
394     if (ctx->mpi && (rank != 0)) {
395         return;
396     }
397 #endif
398 
399 #if defined (HAVE_MPI)
400     printf("  Note: test can be also launched as an MPI application\n");
401     printf("\n");
402 #elif defined (HAVE_RTE)
403     printf("  Note: this test can be also launched as an libRTE application\n");
404     printf("\n");
405 #endif
406     printf("  Usage: %s [ server-hostname ] [ options ]\n", program);
407     printf("\n");
408     printf("  Common options:\n");
409     printf("     -t <test>      test to run:\n");
410     for (test = tests; test->name; ++test) {
411         printf("    %13s - %s %s\n", test->name,
412                api_names[test->api], test->desc);
413     }
414     printf("\n");
415     printf("     -s <size>      list of scatter-gather sizes for single message (%zu)\n",
416                                 ctx->params.super.msg_size_list[0]);
417     printf("                    for example: \"-s 16,48,8192,8192,14\"\n");
418     printf("     -m <send mem type>[,<recv mem type>]\n");
419     printf("                    memory type of message for sender and receiver (host)\n");
420     print_memory_type_usage();
421     printf("     -n <iters>     number of iterations to run (%ld)\n", ctx->params.super.max_iter);
422     printf("     -w <iters>     number of warm-up iterations (%zu)\n",
423                                 ctx->params.super.warmup_iter);
424     printf("     -c <cpulist>   set affinity to this CPU list (separated by comma) (off)\n");
425     printf("     -O <count>     maximal number of uncompleted outstanding sends\n");
426     printf("     -i <offset>    distance between consecutive scatter-gather entries (%zu)\n",
427                                 ctx->params.super.iov_stride);
428     printf("     -T <threads>   number of threads in the test (%d)\n",
429                                 ctx->params.super.thread_count);
430     printf("     -B             register memory with NONBLOCK flag\n");
431     printf("     -b <file>      read and execute tests from a batch file: every line in the\n");
432     printf("                    file is a test to run, first word is test name, the rest of\n");
433     printf("                    the line is command-line arguments for the test.\n");
434     printf("     -p <port>      TCP port to use for data exchange (%d)\n", ctx->port);
435 #ifdef HAVE_MPI
436     printf("     -P <0|1>       disable/enable MPI mode (%d)\n", ctx->mpi);
437 #endif
438     printf("     -h             show this help message\n");
439     printf("\n");
440     printf("  Output format:\n");
441     printf("     -N             use numeric formatting (thousands separator)\n");
442     printf("     -f             print only final numbers\n");
443     printf("     -v             print CSV-formatted output\n");
444     printf("\n");
445     printf("  UCT only:\n");
446     printf("     -d <device>    device to use for testing\n");
447     printf("     -x <tl>        transport to use for testing\n");
448     printf("     -D <layout>    data layout for sender side:\n");
449     printf("                        short - short messages (default, cannot be used for get)\n");
450     printf("                        bcopy - copy-out (cannot be used for atomics)\n");
451     printf("                        zcopy - zero-copy (cannot be used for atomics)\n");
452     printf("                        iov    - scatter-gather list (iovec)\n");
453     printf("     -W <count>     flow control window size, for active messages (%u)\n",
454                                 ctx->params.super.uct.fc_window);
455     printf("     -H <size>      active message header size (%zu)\n",
456                                 ctx->params.super.am_hdr_size);
457     printf("     -A <mode>      asynchronous progress mode (thread_spinlock)\n");
458     printf("                        thread_spinlock - separate progress thread with spin locking\n");
459     printf("                        thread_mutex - separate progress thread with mutex locking\n");
460     printf("                        signal - signal-based timer\n");
461     printf("\n");
462     printf("  UCP only:\n");
463     printf("     -M <thread>    thread support level for progress engine (single)\n");
464     printf("                        single     - only the master thread can access\n");
465     printf("                        serialized - one thread can access at a time\n");
466     printf("                        multi      - multiple threads can access\n");
467     printf("     -D <layout>[,<layout>]\n");
468     printf("                    data layout for sender and receiver side (contig)\n");
469     printf("                        contig - Continuous datatype\n");
470     printf("                        iov    - Scatter-gather list\n");
471     printf("     -C             use wild-card tag for tag tests\n");
472     printf("     -U             force unexpected flow by using tag probe\n");
473     printf("     -r <mode>      receive mode for stream tests (recv)\n");
474     printf("                        recv       : Use ucp_stream_recv_nb\n");
475     printf("                        recv_data  : Use ucp_stream_recv_data_nb\n");
476     printf("\n");
477     printf("   NOTE: When running UCP tests, transport and device should be specified by\n");
478     printf("         environment variables: UCX_TLS and UCX_[SELF|SHM|NET]_DEVICES.\n");
479     printf("\n");
480 }
481 
parse_ucp_datatype_params(const char * opt_arg,ucp_perf_datatype_t * datatype)482 static ucs_status_t parse_ucp_datatype_params(const char *opt_arg,
483                                               ucp_perf_datatype_t *datatype)
484 {
485     const char  *iov_type         = "iov";
486     const size_t iov_type_size    = strlen("iov");
487     const char  *contig_type      = "contig";
488     const size_t contig_type_size = strlen("contig");
489 
490     if (0 == strncmp(opt_arg, iov_type, iov_type_size)) {
491         *datatype = UCP_PERF_DATATYPE_IOV;
492     } else if (0 == strncmp(opt_arg, contig_type, contig_type_size)) {
493         *datatype = UCP_PERF_DATATYPE_CONTIG;
494     } else {
495         return UCS_ERR_INVALID_PARAM;
496     }
497 
498     return UCS_OK;
499 }
500 
parse_mem_type(const char * opt_arg,ucs_memory_type_t * mem_type)501 static ucs_status_t parse_mem_type(const char *opt_arg,
502                                    ucs_memory_type_t *mem_type)
503 {
504     ucs_memory_type_t it;
505     for (it = UCS_MEMORY_TYPE_HOST; it < UCS_MEMORY_TYPE_LAST; it++) {
506         if(!strcmp(opt_arg, ucs_memory_type_names[it]) &&
507            (ucx_perf_mem_type_allocators[it] != NULL)) {
508             *mem_type = it;
509             return UCS_OK;
510         }
511     }
512     ucs_error("Unsupported memory type: \"%s\"", opt_arg);
513     return UCS_ERR_INVALID_PARAM;
514 }
515 
parse_mem_type_params(const char * opt_arg,ucs_memory_type_t * send_mem_type,ucs_memory_type_t * recv_mem_type)516 static ucs_status_t parse_mem_type_params(const char *opt_arg,
517                                           ucs_memory_type_t *send_mem_type,
518                                           ucs_memory_type_t *recv_mem_type)
519 {
520     const char *delim = ",";
521     char *token       = strtok((char*)opt_arg, delim);
522 
523     if (UCS_OK != parse_mem_type(token, send_mem_type)) {
524         return UCS_ERR_INVALID_PARAM;
525     }
526 
527     token = strtok(NULL, delim);
528     if (NULL == token) {
529         *recv_mem_type = *send_mem_type;
530         return UCS_OK;
531     } else {
532         return parse_mem_type(token, recv_mem_type);
533     }
534 }
535 
parse_message_sizes_params(const char * opt_arg,ucx_perf_params_t * params)536 static ucs_status_t parse_message_sizes_params(const char *opt_arg,
537                                                ucx_perf_params_t *params)
538 {
539     const char delim = ',';
540     size_t *msg_size_list, token_num, token_it;
541     char *optarg_ptr, *optarg_ptr2;
542 
543     optarg_ptr = (char *)opt_arg;
544     token_num  = 0;
545     /* count the number of given message sizes */
546     while ((optarg_ptr = strchr(optarg_ptr, delim)) != NULL) {
547         ++optarg_ptr;
548         ++token_num;
549     }
550     ++token_num;
551 
552     msg_size_list = realloc(params->msg_size_list,
553                             sizeof(*params->msg_size_list) * token_num);
554     if (NULL == msg_size_list) {
555         return UCS_ERR_NO_MEMORY;
556     }
557 
558     params->msg_size_list = msg_size_list;
559 
560     optarg_ptr = (char *)opt_arg;
561     errno = 0;
562     for (token_it = 0; token_it < token_num; ++token_it) {
563         params->msg_size_list[token_it] = strtoul(optarg_ptr, &optarg_ptr2, 10);
564         if (((ERANGE == errno) && (ULONG_MAX == params->msg_size_list[token_it])) ||
565             ((errno != 0) && (params->msg_size_list[token_it] == 0)) ||
566             (optarg_ptr == optarg_ptr2)) {
567             free(params->msg_size_list);
568             params->msg_size_list = NULL; /* prevent double free */
569             ucs_error("Invalid option substring argument at position %lu", token_it);
570             return UCS_ERR_INVALID_PARAM;
571         }
572         optarg_ptr = optarg_ptr2 + 1;
573     }
574 
575     params->msg_size_cnt = token_num;
576     return UCS_OK;
577 }
578 
init_test_params(perftest_params_t * params)579 static ucs_status_t init_test_params(perftest_params_t *params)
580 {
581     memset(params, 0, sizeof(*params));
582     params->super.api               = UCX_PERF_API_LAST;
583     params->super.command           = UCX_PERF_CMD_LAST;
584     params->super.test_type         = UCX_PERF_TEST_TYPE_LAST;
585     params->super.thread_mode       = UCS_THREAD_MODE_SINGLE;
586     params->super.thread_count      = 1;
587     params->super.async_mode        = UCS_ASYNC_THREAD_LOCK_TYPE;
588     params->super.wait_mode         = UCX_PERF_WAIT_MODE_LAST;
589     params->super.max_outstanding   = 0;
590     params->super.warmup_iter       = 10000;
591     params->super.am_hdr_size       = 8;
592     params->super.alignment         = ucs_get_page_size();
593     params->super.max_iter          = 1000000l;
594     params->super.max_time          = 0.0;
595     params->super.report_interval   = 1.0;
596     params->super.flags             = UCX_PERF_TEST_FLAG_VERBOSE;
597     params->super.uct.fc_window     = UCT_PERF_TEST_MAX_FC_WINDOW;
598     params->super.uct.data_layout   = UCT_PERF_DATA_LAYOUT_SHORT;
599     params->super.send_mem_type     = UCS_MEMORY_TYPE_HOST;
600     params->super.recv_mem_type     = UCS_MEMORY_TYPE_HOST;
601     params->super.msg_size_cnt      = 1;
602     params->super.iov_stride        = 0;
603     params->super.ucp.send_datatype = UCP_PERF_DATATYPE_CONTIG;
604     params->super.ucp.recv_datatype = UCP_PERF_DATATYPE_CONTIG;
605     strcpy(params->super.uct.dev_name, TL_RESOURCE_NAME_NONE);
606     strcpy(params->super.uct.tl_name,  TL_RESOURCE_NAME_NONE);
607 
608     params->super.msg_size_list = calloc(params->super.msg_size_cnt,
609                                          sizeof(*params->super.msg_size_list));
610     if (params->super.msg_size_list == NULL) {
611         return UCS_ERR_NO_MEMORY;
612     }
613 
614     params->super.msg_size_list[0] = 8;
615     params->test_id                = TEST_ID_UNDEFINED;
616 
617     return UCS_OK;
618 }
619 
parse_test_params(perftest_params_t * params,char opt,const char * opt_arg)620 static ucs_status_t parse_test_params(perftest_params_t *params, char opt,
621                                       const char *opt_arg)
622 {
623     char *optarg2 = NULL;
624     test_type_t *test;
625     unsigned i;
626 
627     switch (opt) {
628     case 'd':
629         ucs_snprintf_zero(params->super.uct.dev_name,
630                           sizeof(params->super.uct.dev_name), "%s", opt_arg);
631         return UCS_OK;
632     case 'x':
633         ucs_snprintf_zero(params->super.uct.tl_name,
634                           sizeof(params->super.uct.tl_name), "%s", opt_arg);
635         return UCS_OK;
636     case 't':
637         for (i = 0; tests[i].name != NULL; ++i) {
638             test = &tests[i];
639             if (!strcmp(opt_arg, test->name)) {
640                 params->super.api       = test->api;
641                 params->super.command   = test->command;
642                 params->super.test_type = test->test_type;
643                 params->test_id         = i;
644                 break;
645             }
646         }
647         if (params->test_id == TEST_ID_UNDEFINED) {
648             ucs_error("Invalid option argument for -t");
649             return UCS_ERR_INVALID_PARAM;
650         }
651         return UCS_OK;
652     case 'D':
653         if (!strcmp(opt_arg, "short")) {
654             params->super.uct.data_layout   = UCT_PERF_DATA_LAYOUT_SHORT;
655         } else if (!strcmp(opt_arg, "bcopy")) {
656             params->super.uct.data_layout   = UCT_PERF_DATA_LAYOUT_BCOPY;
657         } else if (!strcmp(opt_arg, "zcopy")) {
658             params->super.uct.data_layout   = UCT_PERF_DATA_LAYOUT_ZCOPY;
659         } else if (UCS_OK == parse_ucp_datatype_params(opt_arg,
660                                                        &params->super.ucp.send_datatype)) {
661             optarg2 = strchr(opt_arg, ',');
662             if (optarg2) {
663                 if (UCS_OK != parse_ucp_datatype_params(optarg2 + 1,
664                                                        &params->super.ucp.recv_datatype)) {
665                     return UCS_ERR_INVALID_PARAM;
666                 }
667             }
668         } else {
669             ucs_error("Invalid option argument for -D");
670             return UCS_ERR_INVALID_PARAM;
671         }
672         return UCS_OK;
673     case 'i':
674         params->super.iov_stride = atol(opt_arg);
675         return UCS_OK;
676     case 'n':
677         params->super.max_iter = atol(opt_arg);
678         return UCS_OK;
679     case 's':
680         return parse_message_sizes_params(opt_arg, &params->super);
681     case 'H':
682         params->super.am_hdr_size = atol(opt_arg);
683         return UCS_OK;
684     case 'W':
685         params->super.uct.fc_window = atoi(opt_arg);
686         return UCS_OK;
687     case 'O':
688         params->super.max_outstanding = atoi(opt_arg);
689         return UCS_OK;
690     case 'w':
691         params->super.warmup_iter = atol(opt_arg);
692         return UCS_OK;
693     case 'o':
694         params->super.flags |= UCX_PERF_TEST_FLAG_ONE_SIDED;
695         return UCS_OK;
696     case 'B':
697         params->super.flags |= UCX_PERF_TEST_FLAG_MAP_NONBLOCK;
698         return UCS_OK;
699     case 'q':
700         params->super.flags &= ~UCX_PERF_TEST_FLAG_VERBOSE;
701         return UCS_OK;
702     case 'C':
703         params->super.flags |= UCX_PERF_TEST_FLAG_TAG_WILDCARD;
704         return UCS_OK;
705     case 'U':
706         params->super.flags |= UCX_PERF_TEST_FLAG_TAG_UNEXP_PROBE;
707         return UCS_OK;
708     case 'M':
709         if (!strcmp(opt_arg, "single")) {
710             params->super.thread_mode = UCS_THREAD_MODE_SINGLE;
711             return UCS_OK;
712         } else if (!strcmp(opt_arg, "serialized")) {
713             params->super.thread_mode = UCS_THREAD_MODE_SERIALIZED;
714             return UCS_OK;
715         } else if (!strcmp(opt_arg, "multi")) {
716             params->super.thread_mode = UCS_THREAD_MODE_MULTI;
717             return UCS_OK;
718         } else {
719             ucs_error("Invalid option argument for -M");
720             return UCS_ERR_INVALID_PARAM;
721         }
722     case 'T':
723         params->super.thread_count = atoi(opt_arg);
724         return UCS_OK;
725     case 'A':
726         if (!strcmp(opt_arg, "thread") || !strcmp(opt_arg, "thread_spinlock")) {
727             params->super.async_mode = UCS_ASYNC_MODE_THREAD_SPINLOCK;
728             return UCS_OK;
729         } else if (!strcmp(opt_arg, "thread_mutex")) {
730             params->super.async_mode = UCS_ASYNC_MODE_THREAD_MUTEX;
731             return UCS_OK;
732         } else if (!strcmp(opt_arg, "signal")) {
733             params->super.async_mode = UCS_ASYNC_MODE_SIGNAL;
734             return UCS_OK;
735         } else {
736             ucs_error("Invalid option argument for -A");
737             return UCS_ERR_INVALID_PARAM;
738         }
739     case 'r':
740         if (!strcmp(opt_arg, "recv_data")) {
741             params->super.flags |= UCX_PERF_TEST_FLAG_STREAM_RECV_DATA;
742             return UCS_OK;
743         } else if (!strcmp(opt_arg, "recv")) {
744             params->super.flags &= ~UCX_PERF_TEST_FLAG_STREAM_RECV_DATA;
745             return UCS_OK;
746         }
747         return UCS_ERR_INVALID_PARAM;
748     case 'm':
749         if (UCS_OK != parse_mem_type_params(opt_arg,
750                                             &params->super.send_mem_type,
751                                             &params->super.recv_mem_type)) {
752             return UCS_ERR_INVALID_PARAM;
753         }
754         return UCS_OK;
755     default:
756        return UCS_ERR_INVALID_PARAM;
757     }
758 }
759 
adjust_test_params(perftest_params_t * params,const char * error_prefix)760 static ucs_status_t adjust_test_params(perftest_params_t *params,
761                                        const char *error_prefix)
762 {
763     test_type_t *test;
764 
765     if (params->test_id == TEST_ID_UNDEFINED) {
766         ucs_error("%smissing test name", error_prefix);
767         return UCS_ERR_INVALID_PARAM;
768     }
769 
770     test = &tests[params->test_id];
771 
772     if (params->super.max_outstanding == 0) {
773         params->super.max_outstanding = test->window_size;
774     }
775 
776     return UCS_OK;
777 }
778 
read_batch_file(FILE * batch_file,const char * file_name,int * line_num,perftest_params_t * params,char ** test_name_p)779 static ucs_status_t read_batch_file(FILE *batch_file, const char *file_name,
780                                     int *line_num, perftest_params_t *params,
781                                     char** test_name_p)
782 {
783 #define MAX_SIZE 256
784 #define MAX_ARG_SIZE 2048
785     ucs_status_t status;
786     char buf[MAX_ARG_SIZE];
787     char error_prefix[MAX_ARG_SIZE];
788     int argc;
789     char *argv[MAX_SIZE + 1];
790     int c;
791     char *p;
792 
793     do {
794         if (fgets(buf, sizeof(buf) - 1, batch_file) == NULL) {
795             return UCS_ERR_NO_ELEM;
796         }
797         ++(*line_num);
798 
799         argc = 0;
800         p = strtok(buf, " \t\n\r");
801         while (p && (argc < MAX_SIZE)) {
802             argv[argc++] = p;
803             p = strtok(NULL, " \t\n\r");
804         }
805         argv[argc] = NULL;
806     } while ((argc == 0) || (argv[0][0] == '#'));
807 
808     ucs_snprintf_safe(error_prefix, sizeof(error_prefix),
809                       "in batch file '%s' line %d: ", file_name, *line_num);
810 
811     optind = 1;
812     while ((c = getopt (argc, argv, TEST_PARAMS_ARGS)) != -1) {
813         status = parse_test_params(params, c, optarg);
814         if (status != UCS_OK) {
815             ucs_error("%s-%c %s: %s", error_prefix, c, optarg,
816                       ucs_status_string(status));
817             return status;
818         }
819     }
820 
821     status = adjust_test_params(params, error_prefix);
822     if (status != UCS_OK) {
823         return status;
824     }
825 
826     *test_name_p = strdup(argv[0]);
827     return UCS_OK;
828 }
829 
parse_cpus(char * opt_arg,struct perftest_context * ctx)830 static ucs_status_t parse_cpus(char *opt_arg, struct perftest_context *ctx)
831 {
832     char *endptr, *cpu_list = opt_arg;
833     int cpu;
834 
835     ctx->num_cpus = 0;
836     cpu           = strtol(cpu_list, &endptr, 10);
837 
838     while (((*endptr == ',') || (*endptr == '\0')) && (ctx->num_cpus < MAX_CPUS)) {
839         if (cpu < 0) {
840             ucs_error("invalid cpu number detected: (%d)", cpu);
841             return UCS_ERR_INVALID_PARAM;
842         }
843 
844         ctx->cpus[ctx->num_cpus++] = cpu;
845 
846         if (*endptr == '\0') {
847             break;
848         }
849 
850         cpu_list = endptr + 1; /* skip the comma */
851         cpu      = strtol(cpu_list, &endptr, 10);
852     }
853 
854     if (*endptr == ',') {
855         ucs_error("number of listed cpus exceeds the maximum supported value (%d)",
856                   MAX_CPUS);
857         return UCS_ERR_INVALID_PARAM;
858     }
859 
860     return UCS_OK;
861 }
862 
parse_opts(struct perftest_context * ctx,int mpi_initialized,int argc,char ** argv)863 static ucs_status_t parse_opts(struct perftest_context *ctx, int mpi_initialized,
864                                int argc, char **argv)
865 {
866     ucs_status_t status;
867     int c;
868 
869     ucs_trace_func("");
870 
871     ucx_perf_global_init(); /* initialize memory types */
872 
873     status = init_test_params(&ctx->params);
874     if (status != UCS_OK) {
875         return status;
876     }
877 
878     ctx->server_addr            = NULL;
879     ctx->num_batch_files        = 0;
880     ctx->port                   = 13337;
881     ctx->flags                  = 0;
882     ctx->mpi                    = mpi_initialized;
883 
884     optind = 1;
885     while ((c = getopt (argc, argv, "p:b:Nfvc:P:h" TEST_PARAMS_ARGS)) != -1) {
886         switch (c) {
887         case 'p':
888             ctx->port = atoi(optarg);
889             break;
890         case 'b':
891             if (ctx->num_batch_files < MAX_BATCH_FILES) {
892                 ctx->batch_files[ctx->num_batch_files++] = optarg;
893             }
894             break;
895         case 'N':
896             ctx->flags |= TEST_FLAG_NUMERIC_FMT;
897             break;
898         case 'f':
899             ctx->flags |= TEST_FLAG_PRINT_FINAL;
900             break;
901         case 'v':
902             ctx->flags |= TEST_FLAG_PRINT_CSV;
903             break;
904         case 'c':
905             ctx->flags |= TEST_FLAG_SET_AFFINITY;
906             status = parse_cpus(optarg, ctx);
907             if (status != UCS_OK) {
908                 return status;
909             }
910             break;
911         case 'P':
912 #ifdef HAVE_MPI
913             ctx->mpi = atoi(optarg) && mpi_initialized;
914             break;
915 #endif
916         case 'h':
917             usage(ctx, ucs_basename(argv[0]));
918             return UCS_ERR_CANCELED;
919         default:
920             status = parse_test_params(&ctx->params, c, optarg);
921             if (status != UCS_OK) {
922                 usage(ctx, ucs_basename(argv[0]));
923                 return status;
924             }
925             break;
926         }
927     }
928 
929     if (optind < argc) {
930         ctx->server_addr = argv[optind];
931     }
932 
933     return UCS_OK;
934 }
935 
sock_rte_group_size(void * rte_group)936 static unsigned sock_rte_group_size(void *rte_group)
937 {
938     return 2;
939 }
940 
sock_rte_group_index(void * rte_group)941 static unsigned sock_rte_group_index(void *rte_group)
942 {
943     sock_rte_group_t *group = rte_group;
944     return group->is_server ? 0 : 1;
945 }
946 
sock_rte_barrier(void * rte_group,void (* progress)(void * arg),void * arg)947 static void sock_rte_barrier(void *rte_group, void (*progress)(void *arg),
948                              void *arg)
949 {
950 #pragma omp barrier
951 
952 #pragma omp master
953   {
954     sock_rte_group_t *group = rte_group;
955     const unsigned magic = 0xdeadbeef;
956     unsigned snc;
957 
958     snc = magic;
959     safe_send(group->connfd, &snc, sizeof(unsigned), progress, arg);
960 
961     snc = 0;
962     safe_recv(group->connfd, &snc, sizeof(unsigned), progress, arg);
963 
964     ucs_assert(snc == magic);
965   }
966 #pragma omp barrier
967 }
968 
sock_rte_post_vec(void * rte_group,const struct iovec * iovec,int iovcnt,void ** req)969 static void sock_rte_post_vec(void *rte_group, const struct iovec *iovec,
970                               int iovcnt, void **req)
971 {
972     sock_rte_group_t *group = rte_group;
973     size_t size;
974     int i;
975 
976     size = 0;
977     for (i = 0; i < iovcnt; ++i) {
978         size += iovec[i].iov_len;
979     }
980 
981     safe_send(group->connfd, &size, sizeof(size), NULL, NULL);
982     for (i = 0; i < iovcnt; ++i) {
983         safe_send(group->connfd, iovec[i].iov_base, iovec[i].iov_len, NULL,
984                   NULL);
985     }
986 }
987 
sock_rte_recv(void * rte_group,unsigned src,void * buffer,size_t max,void * req)988 static void sock_rte_recv(void *rte_group, unsigned src, void *buffer,
989                           size_t max, void *req)
990 {
991     sock_rte_group_t *group = rte_group;
992     int group_index;
993     size_t size;
994 
995     group_index = sock_rte_group_index(rte_group);
996     if (src == group_index) {
997         return;
998     }
999 
1000     ucs_assert_always(src == (1 - group_index));
1001     safe_recv(group->connfd, &size, sizeof(size), NULL, NULL);
1002     ucs_assert_always(size <= max);
1003     safe_recv(group->connfd, buffer, size, NULL, NULL);
1004 }
1005 
sock_rte_report(void * rte_group,const ucx_perf_result_t * result,void * arg,int is_final,int is_multi_thread)1006 static void sock_rte_report(void *rte_group, const ucx_perf_result_t *result,
1007                             void *arg, int is_final, int is_multi_thread)
1008 {
1009     struct perftest_context *ctx = arg;
1010     print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
1011                    is_final, ctx->server_addr == NULL, is_multi_thread);
1012 }
1013 
1014 static ucx_perf_rte_t sock_rte = {
1015     .group_size    = sock_rte_group_size,
1016     .group_index   = sock_rte_group_index,
1017     .barrier       = sock_rte_barrier,
1018     .post_vec      = sock_rte_post_vec,
1019     .recv          = sock_rte_recv,
1020     .exchange_vec  = (ucx_perf_rte_exchange_vec_func_t)ucs_empty_function,
1021     .report        = sock_rte_report,
1022 };
1023 
setup_sock_rte(struct perftest_context * ctx)1024 static ucs_status_t setup_sock_rte(struct perftest_context *ctx)
1025 {
1026     struct sockaddr_in inaddr;
1027     struct hostent *he;
1028     ucs_status_t status;
1029     int optval = 1;
1030     int sockfd, connfd;
1031     int ret;
1032 
1033     sockfd = socket(AF_INET, SOCK_STREAM, 0);
1034     if (sockfd < 0) {
1035         ucs_error("socket() failed: %m");
1036         status = UCS_ERR_IO_ERROR;
1037         goto err;
1038     }
1039 
1040     if (ctx->server_addr == NULL) {
1041         optval = 1;
1042         status = ucs_socket_setopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
1043                                    &optval, sizeof(optval));
1044         if (status != UCS_OK) {
1045             goto err_close_sockfd;
1046         }
1047 
1048         inaddr.sin_family      = AF_INET;
1049         inaddr.sin_port        = htons(ctx->port);
1050         inaddr.sin_addr.s_addr = INADDR_ANY;
1051         memset(inaddr.sin_zero, 0, sizeof(inaddr.sin_zero));
1052         ret = bind(sockfd, (struct sockaddr*)&inaddr, sizeof(inaddr));
1053         if (ret < 0) {
1054             ucs_error("bind() failed: %m");
1055             status = UCS_ERR_INVALID_ADDR;
1056             goto err_close_sockfd;
1057         }
1058 
1059         ret = listen(sockfd, 10);
1060         if (ret < 0) {
1061             ucs_error("listen() failed: %m");
1062             status = UCS_ERR_IO_ERROR;
1063             goto err_close_sockfd;
1064         }
1065 
1066         printf("Waiting for connection...\n");
1067 
1068         /* Accept next connection */
1069         connfd = accept(sockfd, NULL, NULL);
1070         if (connfd < 0) {
1071             ucs_error("accept() failed: %m");
1072             status = UCS_ERR_IO_ERROR;
1073             goto err_close_sockfd;
1074         }
1075 
1076         close(sockfd);
1077 
1078         ret = safe_recv(connfd, &ctx->params, sizeof(ctx->params), NULL, NULL);
1079         if (ret) {
1080             status = UCS_ERR_IO_ERROR;
1081             goto err_close_connfd;
1082         }
1083 
1084         if (ctx->params.super.msg_size_cnt) {
1085             ctx->params.super.msg_size_list =
1086                     calloc(ctx->params.super.msg_size_cnt,
1087                            sizeof(*ctx->params.super.msg_size_list));
1088             if (NULL == ctx->params.super.msg_size_list) {
1089                 status = UCS_ERR_NO_MEMORY;
1090                 goto err_close_connfd;
1091             }
1092 
1093             ret = safe_recv(connfd, ctx->params.super.msg_size_list,
1094                             sizeof(*ctx->params.super.msg_size_list) *
1095                             ctx->params.super.msg_size_cnt,
1096                             NULL, NULL);
1097             if (ret) {
1098                 status = UCS_ERR_IO_ERROR;
1099                 goto err_close_connfd;
1100             }
1101         }
1102 
1103         ctx->sock_rte_group.connfd    = connfd;
1104         ctx->sock_rte_group.is_server = 1;
1105     } else {
1106         he = gethostbyname(ctx->server_addr);
1107         if (he == NULL || he->h_addr_list == NULL) {
1108             ucs_error("host %s not found: %s", ctx->server_addr,
1109                       hstrerror(h_errno));
1110             status = UCS_ERR_INVALID_ADDR;
1111             goto err_close_sockfd;
1112         }
1113 
1114         inaddr.sin_family = he->h_addrtype;
1115         inaddr.sin_port   = htons(ctx->port);
1116         ucs_assert(he->h_length == sizeof(inaddr.sin_addr));
1117         memcpy(&inaddr.sin_addr, he->h_addr_list[0], he->h_length);
1118         memset(inaddr.sin_zero, 0, sizeof(inaddr.sin_zero));
1119 
1120         ret = connect(sockfd, (struct sockaddr*)&inaddr, sizeof(inaddr));
1121         if (ret < 0) {
1122             ucs_error("connect() failed: %m");
1123             status = UCS_ERR_UNREACHABLE;
1124             goto err_close_sockfd;
1125         }
1126 
1127         safe_send(sockfd, &ctx->params, sizeof(ctx->params), NULL, NULL);
1128         if (ctx->params.super.msg_size_cnt) {
1129             safe_send(sockfd, ctx->params.super.msg_size_list,
1130                       sizeof(*ctx->params.super.msg_size_list) *
1131                       ctx->params.super.msg_size_cnt,
1132                       NULL, NULL);
1133         }
1134 
1135         ctx->sock_rte_group.connfd    = sockfd;
1136         ctx->sock_rte_group.is_server = 0;
1137     }
1138 
1139     if (ctx->sock_rte_group.is_server) {
1140         ctx->flags |= TEST_FLAG_PRINT_TEST;
1141     } else {
1142         ctx->flags |= TEST_FLAG_PRINT_RESULTS;
1143     }
1144 
1145     ctx->params.super.rte_group  = &ctx->sock_rte_group;
1146     ctx->params.super.rte        = &sock_rte;
1147     ctx->params.super.report_arg = ctx;
1148     return UCS_OK;
1149 
1150 err_close_connfd:
1151     close(connfd);
1152     goto err;
1153 err_close_sockfd:
1154     close(sockfd);
1155 err:
1156     return status;
1157 }
1158 
cleanup_sock_rte(struct perftest_context * ctx)1159 static ucs_status_t cleanup_sock_rte(struct perftest_context *ctx)
1160 {
1161     close(ctx->sock_rte_group.connfd);
1162     return UCS_OK;
1163 }
1164 
1165 #if defined (HAVE_MPI)
mpi_rte_group_size(void * rte_group)1166 static unsigned mpi_rte_group_size(void *rte_group)
1167 {
1168     int size;
1169     MPI_Comm_size(MPI_COMM_WORLD, &size);
1170     return size;
1171 }
1172 
mpi_rte_group_index(void * rte_group)1173 static unsigned mpi_rte_group_index(void *rte_group)
1174 {
1175     int rank;
1176     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1177     return rank;
1178 }
1179 
mpi_rte_barrier(void * rte_group,void (* progress)(void * arg),void * arg)1180 static void mpi_rte_barrier(void *rte_group, void (*progress)(void *arg),
1181                             void *arg)
1182 {
1183     int group_size, my_rank, i;
1184     MPI_Request *reqs;
1185     int nreqs = 0;
1186     int dummy;
1187     int flag;
1188 
1189 #pragma omp barrier
1190 
1191 #pragma omp master
1192   {
1193     /*
1194      * Naive non-blocking barrier implementation over send/recv, to call user
1195      * progress while waiting for completion.
1196      * Not using MPI_Ibarrier to be compatible with MPI-1.
1197      */
1198 
1199     MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1200     MPI_Comm_size(MPI_COMM_WORLD, &group_size);
1201 
1202     /* allocate maximal possible number of requests */
1203     reqs = (MPI_Request*)alloca(sizeof(*reqs) * group_size);
1204 
1205     if (my_rank == 0) {
1206         /* root gathers "ping" from all other ranks */
1207         for (i = 1; i < group_size; ++i) {
1208             MPI_Irecv(&dummy, 0, MPI_INT,
1209                       i /* source */,
1210                       1 /* tag */,
1211                       MPI_COMM_WORLD,
1212                       &reqs[nreqs++]);
1213         }
1214     } else {
1215         /* every non-root rank sends "ping" and waits for "pong" */
1216         MPI_Send(&dummy, 0, MPI_INT,
1217                  0 /* dest */,
1218                  1 /* tag */,
1219                  MPI_COMM_WORLD);
1220         MPI_Irecv(&dummy, 0, MPI_INT,
1221                   0 /* source */,
1222                   2 /* tag */,
1223                   MPI_COMM_WORLD,
1224                   &reqs[nreqs++]);
1225     }
1226 
1227     /* Waiting for receive requests */
1228     do {
1229         MPI_Testall(nreqs, reqs, &flag, MPI_STATUSES_IGNORE);
1230         progress(arg);
1231     } while (!flag);
1232 
1233     if (my_rank == 0) {
1234         /* root sends "pong" to all ranks */
1235         for (i = 1; i < group_size; ++i) {
1236             MPI_Send(&dummy, 0, MPI_INT,
1237                      i /* dest */,
1238                      2 /* tag */,
1239                      MPI_COMM_WORLD);
1240        }
1241     }
1242   }
1243 #pragma omp barrier
1244 }
1245 
mpi_rte_post_vec(void * rte_group,const struct iovec * iovec,int iovcnt,void ** req)1246 static void mpi_rte_post_vec(void *rte_group, const struct iovec *iovec,
1247                              int iovcnt, void **req)
1248 {
1249     int group_size;
1250     int my_rank;
1251     int dest, i;
1252 
1253     MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1254     MPI_Comm_size(MPI_COMM_WORLD, &group_size);
1255 
1256     for (dest = 0; dest < group_size; ++dest) {
1257         if (dest == my_rank) {
1258             continue;
1259         }
1260 
1261         for (i = 0; i < iovcnt; ++i) {
1262             MPI_Send(iovec[i].iov_base, iovec[i].iov_len, MPI_BYTE, dest,
1263                      i == (iovcnt - 1), /* Send last iov with tag == 1 */
1264                      MPI_COMM_WORLD);
1265         }
1266     }
1267 
1268     *req = (void*)(uintptr_t)1;
1269 }
1270 
mpi_rte_recv(void * rte_group,unsigned src,void * buffer,size_t max,void * req)1271 static void mpi_rte_recv(void *rte_group, unsigned src, void *buffer, size_t max,
1272                          void *req)
1273 {
1274     MPI_Status status;
1275     size_t offset;
1276     int my_rank;
1277     int count;
1278 
1279     MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1280     if (src == my_rank) {
1281         return;
1282     }
1283 
1284     offset = 0;
1285     do {
1286         ucs_assert_always(offset < max);
1287         MPI_Recv(buffer + offset, max - offset, MPI_BYTE, src, MPI_ANY_TAG,
1288                  MPI_COMM_WORLD, &status);
1289         MPI_Get_count(&status, MPI_BYTE, &count);
1290         offset += count;
1291     } while (status.MPI_TAG != 1);
1292 }
1293 
mpi_rte_report(void * rte_group,const ucx_perf_result_t * result,void * arg,int is_final,int is_multi_thread)1294 static void mpi_rte_report(void *rte_group, const ucx_perf_result_t *result,
1295                            void *arg, int is_final, int is_multi_thread)
1296 {
1297     struct perftest_context *ctx = arg;
1298     print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
1299                    is_final, ctx->server_addr == NULL, is_multi_thread);
1300 }
1301 #elif defined (HAVE_RTE)
ext_rte_group_size(void * rte_group)1302 static unsigned ext_rte_group_size(void *rte_group)
1303 {
1304     rte_group_t group = (rte_group_t)rte_group;
1305     return rte_group_size(group);
1306 }
1307 
ext_rte_group_index(void * rte_group)1308 static unsigned ext_rte_group_index(void *rte_group)
1309 {
1310     rte_group_t group = (rte_group_t)rte_group;
1311     return rte_group_rank(group);
1312 }
1313 
ext_rte_barrier(void * rte_group,void (* progress)(void * arg),void * arg)1314 static void ext_rte_barrier(void *rte_group, void (*progress)(void *arg),
1315                             void *arg)
1316 {
1317 #pragma omp barrier
1318 
1319 #pragma omp master
1320   {
1321     rte_group_t group = (rte_group_t)rte_group;
1322     int rc;
1323 
1324     rc = rte_barrier(group);
1325     if (RTE_SUCCESS != rc) {
1326         ucs_error("Failed to rte_barrier");
1327     }
1328   }
1329 #pragma omp barrier
1330 }
1331 
ext_rte_post_vec(void * rte_group,const struct iovec * iovec,int iovcnt,void ** req)1332 static void ext_rte_post_vec(void *rte_group, const struct iovec* iovec,
1333                              int iovcnt, void **req)
1334 {
1335     rte_group_t group = (rte_group_t)rte_group;
1336     rte_srs_session_t session;
1337     rte_iovec_t *r_vec;
1338     int i, rc;
1339 
1340     rc = rte_srs_session_create(group, 0, &session);
1341     if (RTE_SUCCESS != rc) {
1342         ucs_error("Failed to rte_srs_session_create");
1343     }
1344 
1345     r_vec = calloc(iovcnt, sizeof(rte_iovec_t));
1346     if (r_vec == NULL) {
1347         return;
1348     }
1349     for (i = 0; i < iovcnt; ++i) {
1350         r_vec[i].iov_base = iovec[i].iov_base;
1351         r_vec[i].type     = rte_datatype_uint8_t;
1352         r_vec[i].count    = iovec[i].iov_len;
1353     }
1354     rc = rte_srs_set_data(session, "KEY_PERF", r_vec, iovcnt);
1355     if (RTE_SUCCESS != rc) {
1356         ucs_error("Failed to rte_srs_set_data");
1357     }
1358     *req = session;
1359     free(r_vec);
1360 }
1361 
ext_rte_recv(void * rte_group,unsigned src,void * buffer,size_t max,void * req)1362 static void ext_rte_recv(void *rte_group, unsigned src, void *buffer,
1363                          size_t max, void *req)
1364 {
1365     rte_group_t group         = (rte_group_t)rte_group;
1366     rte_srs_session_t session = (rte_srs_session_t)req;
1367     void *rte_buffer = NULL;
1368     rte_iovec_t r_vec;
1369     uint32_t offset;
1370     int size;
1371     int rc;
1372 
1373     rc = rte_srs_get_data(session, rte_group_index_to_ec(group, src),
1374                           "KEY_PERF", &rte_buffer, &size);
1375     if (RTE_SUCCESS != rc) {
1376         ucs_error("Failed to rte_srs_get_data");
1377         return;
1378     }
1379 
1380     r_vec.iov_base = buffer;
1381     r_vec.type     = rte_datatype_uint8_t;
1382     r_vec.count    = max;
1383 
1384     offset = 0;
1385     rte_unpack(&r_vec, rte_buffer, &offset);
1386 
1387     rc = rte_srs_session_destroy(session);
1388     if (RTE_SUCCESS != rc) {
1389         ucs_error("Failed to rte_srs_session_destroy");
1390     }
1391     free(rte_buffer);
1392 }
1393 
ext_rte_exchange_vec(void * rte_group,void * req)1394 static void ext_rte_exchange_vec(void *rte_group, void * req)
1395 {
1396     rte_srs_session_t session = (rte_srs_session_t)req;
1397     int rc;
1398 
1399     rc = rte_srs_exchange_data(session);
1400     if (RTE_SUCCESS != rc) {
1401         ucs_error("Failed to rte_srs_exchange_data");
1402     }
1403 }
1404 
ext_rte_report(void * rte_group,const ucx_perf_result_t * result,void * arg,int is_final,int is_multi_thread)1405 static void ext_rte_report(void *rte_group, const ucx_perf_result_t *result,
1406                            void *arg, int is_final, int is_multi_thread)
1407 {
1408     struct perftest_context *ctx = arg;
1409     print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
1410                    is_final, ctx->server_addr == NULL, is_multi_thread);
1411 }
1412 
1413 static ucx_perf_rte_t ext_rte = {
1414     .group_size    = ext_rte_group_size,
1415     .group_index   = ext_rte_group_index,
1416     .barrier       = ext_rte_barrier,
1417     .report        = ext_rte_report,
1418     .post_vec      = ext_rte_post_vec,
1419     .recv          = ext_rte_recv,
1420     .exchange_vec  = ext_rte_exchange_vec,
1421 };
1422 #endif
1423 
setup_mpi_rte(struct perftest_context * ctx)1424 static ucs_status_t setup_mpi_rte(struct perftest_context *ctx)
1425 {
1426     ucs_trace_func("");
1427 
1428 #if defined (HAVE_MPI)
1429     static ucx_perf_rte_t mpi_rte = {
1430         .group_size    = mpi_rte_group_size,
1431         .group_index   = mpi_rte_group_index,
1432         .barrier       = mpi_rte_barrier,
1433         .post_vec      = mpi_rte_post_vec,
1434         .recv          = mpi_rte_recv,
1435         .exchange_vec  = (void*)ucs_empty_function,
1436         .report        = mpi_rte_report,
1437     };
1438 
1439     int size, rank;
1440 
1441     MPI_Comm_size(MPI_COMM_WORLD, &size);
1442     if (size != 2) {
1443         ucs_error("This test should run with exactly 2 processes (actual: %d)", size);
1444         return UCS_ERR_INVALID_PARAM;
1445     }
1446 
1447     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1448     if (rank == 1) {
1449         ctx->flags |= TEST_FLAG_PRINT_RESULTS;
1450     }
1451 
1452     ctx->params.super.rte_group  = NULL;
1453     ctx->params.super.rte        = &mpi_rte;
1454     ctx->params.super.report_arg = ctx;
1455 #elif defined (HAVE_RTE)
1456     ctx->params.rte_group         = NULL;
1457     ctx->params.rte               = &mpi_rte;
1458     ctx->params.report_arg        = ctx;
1459     rte_group_t group;
1460 
1461     rte_init(NULL, NULL, &group);
1462     if (1 == rte_group_rank(group)) {
1463         ctx->flags |= TEST_FLAG_PRINT_RESULTS;
1464     }
1465 
1466     ctx->params.super.rte_group  = group;
1467     ctx->params.super.rte        = &ext_rte;
1468     ctx->params.super.report_arg = ctx;
1469 #endif
1470     return UCS_OK;
1471 }
1472 
cleanup_mpi_rte(struct perftest_context * ctx)1473 static ucs_status_t cleanup_mpi_rte(struct perftest_context *ctx)
1474 {
1475 #ifdef HAVE_RTE
1476     rte_finalize();
1477 #endif
1478     return UCS_OK;
1479 }
1480 
check_system(struct perftest_context * ctx)1481 static ucs_status_t check_system(struct perftest_context *ctx)
1482 {
1483     ucs_sys_cpuset_t cpuset;
1484     unsigned i, count, nr_cpus;
1485     int ret;
1486 
1487     ucs_trace_func("");
1488 
1489     ret = sysconf(_SC_NPROCESSORS_CONF);
1490     if (ret < 0) {
1491         ucs_error("failed to get local cpu count: %m");
1492         return UCS_ERR_INVALID_PARAM;
1493     }
1494     nr_cpus = ret;
1495 
1496     memset(&cpuset, 0, sizeof(cpuset));
1497     if (ctx->flags & TEST_FLAG_SET_AFFINITY) {
1498         for (i = 0; i < ctx->num_cpus; i++) {
1499             if (ctx->cpus[i] >= nr_cpus) {
1500                 ucs_error("cpu (%u) out of range (0..%u)", ctx->cpus[i], nr_cpus - 1);
1501                 return UCS_ERR_INVALID_PARAM;
1502             }
1503         }
1504 
1505         for (i = 0; i < ctx->num_cpus; i++) {
1506             CPU_SET(ctx->cpus[i], &cpuset);
1507         }
1508 
1509         ret = ucs_sys_setaffinity(&cpuset);
1510         if (ret) {
1511             ucs_warn("sched_setaffinity() failed: %m");
1512             return UCS_ERR_INVALID_PARAM;
1513         }
1514     } else {
1515         ret = ucs_sys_getaffinity(&cpuset);
1516         if (ret) {
1517             ucs_warn("sched_getaffinity() failed: %m");
1518             return UCS_ERR_INVALID_PARAM;
1519         }
1520 
1521         count = 0;
1522         for (i = 0; i < CPU_SETSIZE; ++i) {
1523             if (CPU_ISSET(i, &cpuset)) {
1524                 ++count;
1525             }
1526         }
1527         if (count > 2) {
1528             ucs_warn("CPU affinity is not set (bound to %u cpus)."
1529                      " Performance may be impacted.", count);
1530         }
1531     }
1532 
1533     return UCS_OK;
1534 }
1535 
clone_params(perftest_params_t * dest,const perftest_params_t * src)1536 static ucs_status_t clone_params(perftest_params_t *dest,
1537                                  const perftest_params_t *src)
1538 {
1539     size_t msg_size_list_size;
1540 
1541     *dest                     = *src;
1542     msg_size_list_size        = dest->super.msg_size_cnt *
1543                                 sizeof(*dest->super.msg_size_list);
1544     dest->super.msg_size_list = malloc(msg_size_list_size);
1545     if (dest->super.msg_size_list == NULL) {
1546         return ((msg_size_list_size != 0) ? UCS_ERR_NO_MEMORY : UCS_OK);
1547     }
1548 
1549     memcpy(dest->super.msg_size_list, src->super.msg_size_list,
1550            msg_size_list_size);
1551     return UCS_OK;
1552 }
1553 
run_test_recurs(struct perftest_context * ctx,const perftest_params_t * parent_params,unsigned depth)1554 static ucs_status_t run_test_recurs(struct perftest_context *ctx,
1555                                     const perftest_params_t *parent_params,
1556                                     unsigned depth)
1557 {
1558     perftest_params_t params;
1559     ucx_perf_result_t result;
1560     ucs_status_t status;
1561     FILE *batch_file;
1562     int line_num;
1563 
1564     ucs_trace_func("depth=%u, num_files=%u", depth, ctx->num_batch_files);
1565 
1566     if (parent_params->super.api == UCX_PERF_API_UCP) {
1567         if (strcmp(parent_params->super.uct.dev_name, TL_RESOURCE_NAME_NONE)) {
1568             ucs_warn("-d '%s' ignored for UCP test; see NOTES section in help message",
1569                      parent_params->super.uct.dev_name);
1570         }
1571         if (strcmp(parent_params->super.uct.tl_name, TL_RESOURCE_NAME_NONE)) {
1572             ucs_warn("-x '%s' ignored for UCP test; see NOTES section in help message",
1573                      parent_params->super.uct.tl_name);
1574         }
1575     }
1576 
1577     if (depth >= ctx->num_batch_files) {
1578         print_test_name(ctx);
1579         return ucx_perf_run(&parent_params->super, &result);
1580     }
1581 
1582     batch_file = fopen(ctx->batch_files[depth], "r");
1583     if (batch_file == NULL) {
1584         ucs_error("Failed to open batch file '%s': %m", ctx->batch_files[depth]);
1585         return UCS_ERR_IO_ERROR;
1586     }
1587 
1588     status = clone_params(&params, parent_params);
1589     if (status != UCS_OK) {
1590         goto out;
1591     }
1592 
1593     line_num = 0;
1594     while ((status = read_batch_file(batch_file, ctx->batch_files[depth],
1595                                      &line_num, &params,
1596                                      &ctx->test_names[depth])) == UCS_OK) {
1597         run_test_recurs(ctx, &params, depth + 1);
1598         free(params.super.msg_size_list);
1599         free(ctx->test_names[depth]);
1600         ctx->test_names[depth] = NULL;
1601 
1602         status = clone_params(&params, parent_params);
1603         if (status != UCS_OK) {
1604             goto out;
1605         }
1606     }
1607 
1608     if (status == UCS_ERR_NO_ELEM) {
1609         status = UCS_OK;
1610     }
1611 
1612     free(params.super.msg_size_list);
1613 out:
1614     fclose(batch_file);
1615     return status;
1616 }
1617 
run_test(struct perftest_context * ctx)1618 static ucs_status_t run_test(struct perftest_context *ctx)
1619 {
1620     const char *error_prefix;
1621     ucs_status_t status;
1622 
1623     ucs_trace_func("");
1624 
1625     setlocale(LC_ALL, "en_US");
1626 
1627     /* no batch files, only command line params */
1628     if (ctx->num_batch_files == 0) {
1629         error_prefix = (ctx->flags & TEST_FLAG_PRINT_RESULTS) ?
1630                        "command line: " : "";
1631         status       = adjust_test_params(&ctx->params, error_prefix);
1632         if (status != UCS_OK) {
1633             return status;
1634         }
1635     }
1636 
1637     print_header(ctx);
1638 
1639     status = run_test_recurs(ctx, &ctx->params, 0);
1640     if (status != UCS_OK) {
1641         ucs_error("Failed to run test: %s", ucs_status_string(status));
1642     }
1643 
1644     return status;
1645 }
1646 
main(int argc,char ** argv)1647 int main(int argc, char **argv)
1648 {
1649     struct perftest_context ctx;
1650     ucs_status_t status;
1651     int mpi_initialized;
1652     int mpi_rte;
1653     int ret;
1654 
1655 #ifdef HAVE_MPI
1656     int provided;
1657 
1658     mpi_initialized = !isatty(0) &&
1659                       /* Using MPI_THREAD_FUNNELED since ucx_perftest supports
1660                        * using multiple threads when only the main one makes
1661                        * MPI calls (which is also suitable for a single threaded
1662                        * run).
1663                        * MPI_THREAD_FUNNELED:
1664                        * The process may be multi-threaded, but only the main
1665                        * thread will make MPI calls (all MPI calls are funneled
1666                        * to the main thread). */
1667                       (MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided) == 0);
1668 
1669     if (mpi_initialized && (provided != MPI_THREAD_FUNNELED)) {
1670         printf("MPI_Init_thread failed to set MPI_THREAD_FUNNELED. (provided = %d)\n",
1671                provided);
1672         ret = -1;
1673         goto out;
1674     }
1675 #else
1676     mpi_initialized = 0;
1677 #endif
1678 
1679     /* Parse command line */
1680     status = parse_opts(&ctx, mpi_initialized, argc, argv);
1681     if (status != UCS_OK) {
1682         ret = (status == UCS_ERR_CANCELED) ? 0 : -127;
1683         goto out_msg_size_list;
1684     }
1685 
1686 #ifdef __COVERITY__
1687     /* coverity[dont_call] */
1688     mpi_rte = rand(); /* Shut up deadcode error */
1689 #endif
1690 
1691     if (ctx.mpi) {
1692         mpi_rte = 1;
1693     } else {
1694 #ifdef HAVE_RTE
1695         mpi_rte = 1;
1696 #else
1697         mpi_rte = 0;
1698 #endif
1699     }
1700 
1701     status = check_system(&ctx);
1702     if (status != UCS_OK) {
1703         ret = -1;
1704         goto out_msg_size_list;
1705     }
1706 
1707     /* Create RTE */
1708     status = (mpi_rte) ? setup_mpi_rte(&ctx) : setup_sock_rte(&ctx);
1709     if (status != UCS_OK) {
1710         ret = -1;
1711         goto out_msg_size_list;
1712     }
1713 
1714     /* Run the test */
1715     status = run_test(&ctx);
1716     if (status != UCS_OK) {
1717         ret = -1;
1718         goto out_cleanup_rte;
1719     }
1720 
1721     ret = 0;
1722 
1723 out_cleanup_rte:
1724     (mpi_rte) ? cleanup_mpi_rte(&ctx) : cleanup_sock_rte(&ctx);
1725 out_msg_size_list:
1726     if (ctx.params.super.msg_size_list) {
1727         free(ctx.params.super.msg_size_list);
1728     }
1729 #if HAVE_MPI
1730 out:
1731 #endif
1732     if (mpi_initialized) {
1733 #ifdef HAVE_MPI
1734         MPI_Finalize();
1735 #endif
1736     }
1737     return ret;
1738 }
1739